Merge branch 'develop' into peerDAS

This commit is contained in:
Manu NALEPA
2025-07-23 11:07:15 +02:00
60 changed files with 758 additions and 366 deletions

View File

@@ -2,18 +2,28 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["common.go"],
srcs = [
"common.go",
"header.go",
],
importpath = "github.com/OffchainLabs/prysm/v6/api/apiutil",
visibility = ["//visibility:public"],
deps = ["//consensus-types/primitives:go_default_library"],
deps = [
"//consensus-types/primitives:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["common_test.go"],
srcs = [
"common_test.go",
"header_test.go",
],
embed = [":go_default_library"],
deps = [
"//consensus-types/primitives:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
],
)

122
api/apiutil/header.go Normal file
View File

@@ -0,0 +1,122 @@
package apiutil
import (
"mime"
"sort"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
)
type mediaRange struct {
mt string // canonicalised mediatype, e.g. "application/json"
q float64 // quality factor (01)
raw string // original string useful for logging/debugging
spec int // 2=exact, 1=type/*, 0=*/*
}
func parseMediaRange(field string) (mediaRange, bool) {
field = strings.TrimSpace(field)
mt, params, err := mime.ParseMediaType(field)
if err != nil {
log.WithError(err).Debug("Failed to parse header field")
return mediaRange{}, false
}
r := mediaRange{mt: mt, q: 1, spec: 2, raw: field}
if qs, ok := params["q"]; ok {
v, err := strconv.ParseFloat(qs, 64)
if err != nil || v < 0 || v > 1 {
log.WithField("q", qs).Debug("Invalid quality factor (01)")
return mediaRange{}, false // skip invalid entry
}
r.q = v
}
switch {
case mt == "*/*":
r.spec = 0
case strings.HasSuffix(mt, "/*"):
r.spec = 1
}
return r, true
}
func hasExplicitQ(r mediaRange) bool {
return strings.Contains(strings.ToLower(r.raw), ";q=")
}
// ParseAccept returns media ranges sorted by q (desc) then specificity.
func ParseAccept(header string) []mediaRange {
if header == "" {
return []mediaRange{{mt: "*/*", q: 1, spec: 0, raw: "*/*"}}
}
var out []mediaRange
for _, field := range strings.Split(header, ",") {
if r, ok := parseMediaRange(field); ok {
out = append(out, r)
}
}
sort.SliceStable(out, func(i, j int) bool {
ei, ej := hasExplicitQ(out[i]), hasExplicitQ(out[j])
if ei != ej {
return ei // explicit beats implicit
}
if out[i].q != out[j].q {
return out[i].q > out[j].q
}
return out[i].spec > out[j].spec
})
return out
}
// Matches reports whether content type is acceptable per the header.
func Matches(header, ct string) bool {
for _, r := range ParseAccept(header) {
switch {
case r.q == 0:
continue
case r.mt == "*/*":
return true
case strings.HasSuffix(r.mt, "/*"):
if strings.HasPrefix(ct, r.mt[:len(r.mt)-1]) {
return true
}
case r.mt == ct:
return true
}
}
return false
}
// Negotiate selects the best server type according to the header.
// Returns the chosen type and true, or "", false when nothing matches.
func Negotiate(header string, serverTypes []string) (string, bool) {
for _, r := range ParseAccept(header) {
if r.q == 0 {
continue
}
for _, s := range serverTypes {
if Matches(r.mt, s) {
return s, true
}
}
}
return "", false
}
// PrimaryAcceptMatches only checks if the first accept matches
func PrimaryAcceptMatches(header, produced string) bool {
for _, r := range ParseAccept(header) {
if r.q == 0 {
continue // explicitly unacceptable skip
}
return Matches(r.mt, produced)
}
return false
}

174
api/apiutil/header_test.go Normal file
View File

@@ -0,0 +1,174 @@
package apiutil
import (
"testing"
"github.com/OffchainLabs/prysm/v6/testing/require"
)
func TestParseAccept(t *testing.T) {
type want struct {
mt string
q float64
spec int
}
cases := []struct {
name string
header string
want []want
}{
{
name: "empty header becomes */*;q=1",
header: "",
want: []want{{mt: "*/*", q: 1, spec: 0}},
},
{
name: "quality ordering then specificity",
header: "application/json;q=0.2, */*;q=0.1, application/xml;q=0.5, text/*;q=0.5",
want: []want{
{mt: "application/xml", q: 0.5, spec: 2},
{mt: "text/*", q: 0.5, spec: 1},
{mt: "application/json", q: 0.2, spec: 2},
{mt: "*/*", q: 0.1, spec: 0},
},
},
{
name: "invalid pieces are skipped",
header: "text/plain; q=boom, application/json",
want: []want{{mt: "application/json", q: 1, spec: 2}},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := ParseAccept(tc.header)
gotProjected := make([]want, len(got))
for i, g := range got {
gotProjected[i] = want{mt: g.mt, q: g.q, spec: g.spec}
}
require.DeepEqual(t, gotProjected, tc.want)
})
}
}
func TestMatches(t *testing.T) {
cases := []struct {
name string
accept string
ct string
matches bool
}{
{"exact match", "application/json", "application/json", true},
{"type wildcard", "application/*;q=0.8", "application/xml", true},
{"global wildcard", "*/*;q=0.1", "image/png", true},
{"explicitly unacceptable (q=0)", "text/*;q=0", "text/plain", false},
{"no match", "image/png", "application/json", false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := Matches(tc.accept, tc.ct)
require.Equal(t, tc.matches, got)
})
}
}
func TestNegotiate(t *testing.T) {
cases := []struct {
name string
accept string
serverTypes []string
wantType string
ok bool
}{
{
name: "highest quality wins",
accept: "application/json;q=0.8,application/xml;q=0.9",
serverTypes: []string{"application/json", "application/xml"},
wantType: "application/xml",
ok: true,
},
{
name: "wildcard matches first server type",
accept: "*/*;q=0.5",
serverTypes: []string{"application/octet-stream", "application/json"},
wantType: "application/octet-stream",
ok: true,
},
{
name: "no acceptable type",
accept: "image/png",
serverTypes: []string{"application/json"},
wantType: "",
ok: false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got, ok := Negotiate(tc.accept, tc.serverTypes)
require.Equal(t, tc.ok, ok)
require.Equal(t, tc.wantType, got)
})
}
}
func TestPrimaryAcceptMatches(t *testing.T) {
tests := []struct {
name string
accept string
produced string
expect bool
}{
{
name: "prefers json",
accept: "application/json;q=0.9,application/xml",
produced: "application/json",
expect: true,
},
{
name: "wildcard application beats other wildcard",
accept: "application/*;q=0.2,*/*;q=0.1",
produced: "application/xml",
expect: true,
},
{
name: "json wins",
accept: "application/xml;q=0.8,application/json;q=0.9",
produced: "application/json",
expect: true,
},
{
name: "json loses",
accept: "application/xml;q=0.8,application/json;q=0.9,application/octet-stream;q=0.99",
produced: "application/json",
expect: false,
},
{
name: "json wins with non q option",
accept: "application/xml;q=0.8,image/png,application/json;q=0.9",
produced: "application/json",
expect: true,
},
{
name: "json not primary",
accept: "image/png,application/json",
produced: "application/json",
expect: false,
},
{
name: "absent header",
accept: "",
produced: "text/plain",
expect: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := PrimaryAcceptMatches(tc.accept, tc.produced)
require.Equal(t, got, tc.expect)
})
}
}

View File

@@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"//api/apiutil:go_default_library",
"@com_github_rs_cors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],

View File

@@ -7,6 +7,7 @@ import (
"strings"
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/api/apiutil"
"github.com/rs/cors"
log "github.com/sirupsen/logrus"
)
@@ -74,42 +75,10 @@ func ContentTypeHandler(acceptedMediaTypes []string) Middleware {
func AcceptHeaderHandler(serverAcceptedTypes []string) Middleware {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
acceptHeader := r.Header.Get("Accept")
// header is optional and should skip if not provided
if acceptHeader == "" {
next.ServeHTTP(w, r)
if _, ok := apiutil.Negotiate(r.Header.Get("Accept"), serverAcceptedTypes); !ok {
http.Error(w, "Not Acceptable", http.StatusNotAcceptable)
return
}
accepted := false
acceptTypes := strings.Split(acceptHeader, ",")
// follows rules defined in https://datatracker.ietf.org/doc/html/rfc2616#section-14.1
for _, acceptType := range acceptTypes {
acceptType = strings.TrimSpace(acceptType)
if acceptType == "*/*" {
accepted = true
break
}
for _, serverAcceptedType := range serverAcceptedTypes {
if strings.HasPrefix(acceptType, serverAcceptedType) {
accepted = true
break
}
if acceptType != "/*" && strings.HasSuffix(acceptType, "/*") && strings.HasPrefix(serverAcceptedType, acceptType[:len(acceptType)-2]) {
accepted = true
break
}
}
if accepted {
break
}
}
if !accepted {
http.Error(w, fmt.Sprintf("Not Acceptable: %s", acceptHeader), http.StatusNotAcceptable)
return
}
next.ServeHTTP(w, r)
})
}

View File

@@ -11,7 +11,7 @@ import (
)
var (
// https://github.com/ethereum/consensus-specs/blob/dev/presets/mainnet/trusted_setups/trusted_setup_4096.json
// https://github.com/ethereum/consensus-specs/blob/master/presets/mainnet/trusted_setups/trusted_setup_4096.json
//go:embed trusted_setup_4096.json
embeddedTrustedSetup []byte // 1.2Mb
kzgContext *GoKZG.Context

View File

@@ -177,7 +177,7 @@ func (s *Service) processAttestations(ctx context.Context, disparity time.Durati
for _, a := range atts {
// Based on the spec, don't process the attestation until the subsequent slot.
// This delays consideration in the fork choice until their slot is in the past.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/fork-choice.md#validate_on_attestation
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/fork-choice.md#validate_on_attestation
nextSlot := a.GetData().Slot + 1
if err := slots.VerifyTime(s.genesisTime, nextSlot, disparity); err != nil {
continue

View File

@@ -206,9 +206,9 @@ func ParseWeakSubjectivityInputString(wsCheckpointString string) (*v1alpha1.Chec
// MinEpochsForBlockRequests computes the number of epochs of block history that we need to maintain,
// relative to the current epoch, per the p2p specs. This is used to compute the slot where backfill is complete.
// value defined:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#configuration
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#configuration
// MIN_VALIDATOR_WITHDRAWABILITY_DELAY + CHURN_LIMIT_QUOTIENT // 2 (= 33024, ~5 months)
// detailed rationale: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#why-are-blocksbyrange-requests-only-required-to-be-served-for-the-latest-min_epochs_for_block_requests-epochs
// detailed rationale: https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#why-are-blocksbyrange-requests-only-required-to-be-served-for-the-latest-min_epochs_for_block_requests-epochs
func MinEpochsForBlockRequests() primitives.Epoch {
return params.BeaconConfig().MinValidatorWithdrawabilityDelay +
primitives.Epoch(params.BeaconConfig().ChurnLimitQuotient/2)

View File

@@ -292,7 +292,7 @@ func TestMinEpochsForBlockRequests(t *testing.T) {
params.SetActiveTestCleanup(t, params.MainnetConfig())
var expected primitives.Epoch = 33024
// expected value of 33024 via spec commentary:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#why-are-blocksbyrange-requests-only-required-to-be-served-for-the-latest-min_epochs_for_block_requests-epochs
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#why-are-blocksbyrange-requests-only-required-to-be-served-for-the-latest-min_epochs_for_block_requests-epochs
// MIN_EPOCHS_FOR_BLOCK_REQUESTS is calculated using the arithmetic from compute_weak_subjectivity_period found in the weak subjectivity guide. Specifically to find this max epoch range, we use the worst case event of a very large validator size (>= MIN_PER_EPOCH_CHURN_LIMIT * CHURN_LIMIT_QUOTIENT).
//
// MIN_EPOCHS_FOR_BLOCK_REQUESTS = (

View File

@@ -131,7 +131,7 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, curre
return errors.Wrap(err, "entry filter")
}
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
verifier := s.newDataColumnsVerifier(roDataColumns, verification.ByRangeRequestDataColumnSidecarRequirements)
if err := verifier.ValidFields(); err != nil {

View File

@@ -75,7 +75,7 @@ data-columns
Computation of the maximum size of a DataColumnSidecar
------------------------------------------------------
https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/das-core.md#datacolumnsidecar
https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#datacolumnsidecar
class DataColumnSidecar(Container):

View File

@@ -104,6 +104,7 @@ go_library(
"@com_github_libp2p_go_mplex//:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_multiformats_go_multiaddr//net:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",

View File

@@ -1,7 +1,7 @@
/*
Package p2p implements the Ethereum consensus networking specification.
Canonical spec reference: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md
Canonical spec reference: https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md
Prysm specific implementation design docs
- Networking Design Doc: https://docs.google.com/document/d/1VyhobQRkEjEkEPxmmdWvaHfKWn0j6dEae_wLZlrFtfU/view

View File

@@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@@ -79,8 +80,8 @@ func (s *Service) disconnectFromPeerOnError(
// and validating the response from the peer.
func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Context, id peer.ID) error) {
// Peer map and lock to keep track of current connection attempts.
var peerLock sync.Mutex
peerMap := make(map[peer.ID]bool)
peerLock := new(sync.Mutex)
// This is run at the start of each connection attempt, to ensure
// that there aren't multiple inflight connection requests for the
@@ -108,6 +109,19 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
s.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(_ network.Network, conn network.Conn) {
remotePeer := conn.RemotePeer()
log := log.WithField("peer", remotePeer)
direction := conn.Stat().Direction
// For some reason, right after a disconnection, this `ConnectedF` callback
// is called. We want to avoid processing this connection if the peer was
// disconnected too recently and if we are at the initiative of this connection.
// This is very probably a bug in libp2p.
if direction == network.DirOutbound {
if err := s.wasDisconnectedTooRecently(remotePeer); err != nil {
log.WithError(err).Debug("Skipping connection handler")
return
}
}
// Connection handler must be non-blocking as part of libp2p design.
go func() {
@@ -133,53 +147,56 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
return
}
// Do not perform handshake on inbound dials.
if conn.Stat().Direction == network.DirInbound {
_, err := s.peers.ChainState(remotePeer)
peerExists := err == nil
currentTime := prysmTime.Now()
// Wait for peer to initiate handshake
time.Sleep(timeForStatus)
// Exit if we are disconnected with the peer.
if s.host.Network().Connectedness(remotePeer) != network.Connected {
if direction != network.DirInbound {
s.peers.SetConnectionState(conn.RemotePeer(), peers.Connecting)
if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && !errors.Is(err, io.EOF) {
s.disconnectFromPeerOnError(conn, goodByeFunc, err)
return
}
// If peer hasn't sent a status request, we disconnect with them
if _, err := s.peers.ChainState(remotePeer); errors.Is(err, peerdata.ErrPeerUnknown) || errors.Is(err, peerdata.ErrNoPeerStatus) {
statusMessageMissing.Inc()
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state"))
return
}
if peerExists {
updated, err := s.peers.ChainStateLastUpdated(remotePeer)
if err != nil {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state last updated"))
return
}
// Exit if we don't receive any current status messages from peer.
if updated.IsZero() {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("is zero"))
return
}
if updated.Before(currentTime) {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("did not update"))
return
}
}
s.connectToPeer(conn)
return
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.Connecting)
if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && !errors.Is(err, io.EOF) {
s.disconnectFromPeerOnError(conn, goodByeFunc, err)
// The connection is inbound.
_, err = s.peers.ChainState(remotePeer)
peerExists := err == nil
currentTime := prysmTime.Now()
// Wait for peer to initiate handshake
time.Sleep(timeForStatus)
// Exit if we are disconnected with the peer.
if s.host.Network().Connectedness(remotePeer) != network.Connected {
return
}
// If peer hasn't sent a status request, we disconnect with them
if _, err := s.peers.ChainState(remotePeer); errors.Is(err, peerdata.ErrPeerUnknown) || errors.Is(err, peerdata.ErrNoPeerStatus) {
statusMessageMissing.Inc()
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state"))
return
}
if !peerExists {
s.connectToPeer(conn)
return
}
updated, err := s.peers.ChainStateLastUpdated(remotePeer)
if err != nil {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state last updated"))
return
}
// Exit if we don't receive any current status messages from peer.
if updated.IsZero() {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("is zero"))
return
}
if updated.Before(currentTime) {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("did not update"))
return
}
@@ -220,6 +237,12 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
}
s.peers.SetConnectionState(peerID, peers.Disconnected)
if err := s.peerDisconnectionTime.Add(peerID.String(), time.Now(), cache.DefaultExpiration); err != nil {
// The `DisconnectedF` funcition already called for this peer less than `cache.DefaultExpiration` ago. Skip.
// (Very probably a bug in libp2p.)
log.WithError(err).Trace("Failed to set peer disconnection time")
return
}
// Only log disconnections if we were fully connected.
if priorState == peers.Connected {
@@ -231,6 +254,28 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
})
}
// wasDisconnectedTooRecently checks if the peer was disconnected within the last second.
func (s *Service) wasDisconnectedTooRecently(peerID peer.ID) error {
const disconnectionDurationThreshold = 1 * time.Second
peerDisconnectionTimeObj, ok := s.peerDisconnectionTime.Get(peerID.String())
if !ok {
return nil
}
peerDisconnectionTime, ok := peerDisconnectionTimeObj.(time.Time)
if !ok {
return errors.New("invalid peer disconnection time type")
}
timeSinceDisconnection := time.Since(peerDisconnectionTime)
if timeSinceDisconnection < disconnectionDurationThreshold {
return errors.Errorf("peer %s was disconnected too recently: %s", peerID, timeSinceDisconnection)
}
return nil
}
func agentString(pid peer.ID, hst host.Host) string {
rawVersion, storeErr := hst.Peerstore().Get(pid, agentVersionKey)

View File

@@ -100,21 +100,24 @@ func (s *BadResponsesScorer) countNoLock(pid peer.ID) (int, error) {
// Increment increments the number of bad responses we have received from the given remote peer.
// If peer doesn't exist this method is no-op.
func (s *BadResponsesScorer) Increment(pid peer.ID) {
func (s *BadResponsesScorer) Increment(pid peer.ID) int {
if pid == "" {
return
return 0
}
s.store.Lock()
defer s.store.Unlock()
peerData, ok := s.store.PeerData(pid)
if !ok {
s.store.SetPeerData(pid, &peerdata.PeerData{
BadResponses: 1,
})
return
if ok {
peerData.BadResponses++
return peerData.BadResponses
}
peerData.BadResponses++
const badResponses = 1
peerData = &peerdata.PeerData{BadResponses: badResponses}
s.store.SetPeerData(pid, peerData)
return badResponses
}
// IsBadPeer states if the peer is to be considered bad.

View File

@@ -31,6 +31,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@@ -88,6 +89,7 @@ type Service struct {
activeValidatorCount uint64
custodyGroupCountMut sync.RWMutex // Protects custodyGroupCount
custodyGroupCount uint64
peerDisconnectionTime *cache.Cache
}
// NewService initializes a new p2p service compatible with shared.Service interface. No
@@ -117,16 +119,17 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
ipLimiter := leakybucket.NewCollector(ipLimit, ipBurst, 30*time.Second, true /* deleteEmptyBuckets */)
s := &Service{
ctx: ctx,
cancel: cancel,
cfg: cfg,
addrFilter: addrFilter,
ipLimiter: ipLimiter,
privKey: privKey,
metaData: metaData,
isPreGenesis: true,
joinedTopics: make(map[string]*pubsub.Topic, len(gossipTopicMappings)),
subnetsLock: make(map[uint64]*sync.RWMutex),
ctx: ctx,
cancel: cancel,
cfg: cfg,
addrFilter: addrFilter,
ipLimiter: ipLimiter,
privKey: privKey,
metaData: metaData,
isPreGenesis: true,
joinedTopics: make(map[string]*pubsub.Topic, len(gossipTopicMappings)),
subnetsLock: make(map[uint64]*sync.RWMutex),
peerDisconnectionTime: cache.New(1*time.Second, 1*time.Minute),
}
ipAddr := prysmnetwork.IPAddr()
@@ -488,14 +491,17 @@ func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error
if info.ID == s.host.ID() {
return nil
}
if err := s.Peers().IsBad(info.ID); err != nil {
return errors.Wrap(err, "refused to connect to bad peer")
return errors.Wrap(err, "bad peer")
}
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
defer cancel()
if err := s.host.Connect(ctx, info); err != nil {
s.Peers().Scorers().BadResponsesScorer().Increment(info.ID)
return err
s.downscorePeer(info.ID, "connectionError")
return errors.Wrap(err, "peer connect")
}
return nil
}
@@ -526,3 +532,8 @@ func (s *Service) connectToBootnodes() error {
func (s *Service) isInitialized() bool {
return !s.genesisTime.IsZero() && len(s.genesisValidatorsRoot) == 32
}
func (s *Service) downscorePeer(peerID peer.ID, reason string) {
newScore := s.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}

View File

@@ -381,7 +381,7 @@ func initializeStateWithForkDigest(_ context.Context, t *testing.T, gs startup.C
return fd
}
// TODO: Uncomment when out of devnet
// TODO: Uncomment out of devnet.
// func TestService_connectWithPeer(t *testing.T) {
// params.SetupTestConfigCleanup(t)
// tests := []struct {
@@ -393,7 +393,7 @@ func initializeStateWithForkDigest(_ context.Context, t *testing.T, gs startup.C
// {
// name: "bad peer",
// peers: func() *peers.Status {
// ps := peers.NewStatus(context.Background(), &peers.StatusConfig{
// ps := peers.NewStatus(t.Context(), &peers.StatusConfig{
// ScorerParams: &scorers.Config{},
// })
// for i := 0; i < 10; i++ {
@@ -402,7 +402,7 @@ func initializeStateWithForkDigest(_ context.Context, t *testing.T, gs startup.C
// return ps
// }(),
// info: peer.AddrInfo{ID: "bad"},
// wantErr: "refused to connect to bad peer",
// wantErr: "bad peer",
// },
// }
// for _, tt := range tests {
@@ -413,7 +413,7 @@ func initializeStateWithForkDigest(_ context.Context, t *testing.T, gs startup.C
// t.Fatal(err)
// }
// }()
// ctx := context.Background()
// ctx := t.Context()
// s := &Service{
// host: h,
// peers: tt.peers,

View File

@@ -181,6 +181,11 @@ func (s *Service) findPeersWithSubnets(
// Get all needed subnets that the node is subscribed to.
// Skip nodes that are not subscribed to any of the defective subnets.
node := iterator.Node()
if !s.filterPeer(node) {
continue
}
nodeSubnets, err := filter(node)
if err != nil {
return nil, errors.Wrap(err, "filter node")

View File

@@ -315,7 +315,7 @@ func (bs *Server) ListIndexedAttestationsElectra(
// that it was included in a block. The attestation may have expired.
// Refer to the ethereum consensus specification for more details on how
// attestations are processed and when they are no longer valid.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#attestations
func (bs *Server) AttestationPool(_ context.Context, req *ethpb.AttestationPoolRequest) (*ethpb.AttestationPoolResponse, error) {
var atts []*ethpb.Attestation
var err error

View File

@@ -262,7 +262,7 @@ func (vs *Server) activationStatus(
// It cannot faithfully attest to the head block of the chain, since it has not fully verified that block.
//
// Spec:
// https://github.com/ethereum/consensus-specs/blob/dev/sync/optimistic.md
// https://github.com/ethereum/consensus-specs/blob/master/sync/optimistic.md
func (vs *Server) optimisticStatus(ctx context.Context) error {
if slots.ToEpoch(vs.TimeFetcher.CurrentSlot()) < params.BeaconConfig().BellatrixForkEpoch {
return nil

View File

@@ -17,6 +17,7 @@ import (
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type Service struct {
@@ -211,7 +212,7 @@ func (s *Service) importBatches(ctx context.Context) {
_, err := s.batchImporter(ctx, current, ib, s.store)
if err != nil {
log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import")
s.downscore(ib)
s.downscorePeer(ib.blockPid, "backfillBatchImportError")
s.batchSeq.update(ib.withState(batchErrRetryable))
// If a batch fails, the subsequent batches are no longer considered importable.
break
@@ -336,10 +337,6 @@ func (s *Service) initBatches() error {
return nil
}
func (s *Service) downscore(b batch) {
s.p2p.Peers().Scorers().BadResponsesScorer().Increment(b.blockPid)
}
func (*Service) Stop() error {
return nil
}
@@ -383,3 +380,8 @@ func (s *Service) WaitForCompletion() error {
return nil
}
}
func (s *Service) downscorePeer(peerID peer.ID, reason string) {
newScore := s.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}

View File

@@ -343,14 +343,15 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn {
}
}
}
if errors.Is(response.err, beaconsync.ErrInvalidFetchedData) {
// Peer returned invalid data, penalize.
q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(response.blocksFrom)
log.WithField("pid", response.blocksFrom).Debug("Peer is penalized for invalid blocks")
} else if errors.Is(response.err, verification.ErrBlobInvalid) {
q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(response.blobsFrom)
log.WithField("pid", response.blobsFrom).Debug("Peer is penalized for invalid blob response")
q.downscorePeer(response.blocksFrom, "invalidBlocks")
}
if errors.Is(response.err, verification.ErrBlobInvalid) {
q.downscorePeer(response.blobsFrom, "invalidBlobs")
}
return m.state, response.err
}
m.fetched = *response
@@ -461,6 +462,11 @@ func (q *blocksQueue) onProcessSkippedEvent(ctx context.Context) eventHandlerFn
}
}
func (q *blocksQueue) downscorePeer(peerID peer.ID, reason string) {
newScore := q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}
// onCheckStaleEvent is an event that allows to mark stale epochs,
// so that they can be re-processed.
func onCheckStaleEvent(ctx context.Context) eventHandlerFn {

View File

@@ -522,7 +522,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
})
assert.ErrorContains(t, beaconsync.ErrInvalidFetchedData.Error(), err)
assert.Equal(t, stateScheduled, updatedState)
assert.LogsContain(t, hook, "msg=\"Peer is penalized for invalid blocks\" pid=ZiCa")
assert.LogsContain(t, hook, "Downscore peer")
})
t.Run("transition ok", func(t *testing.T) {

View File

@@ -17,6 +17,7 @@ import (
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -461,12 +462,11 @@ func isPunishableError(err error) bool {
func (s *Service) updatePeerScorerStats(data *blocksQueueFetchedData, count uint64, err error) {
if isPunishableError(err) {
if verification.IsBlobValidationFailure(err) {
log.WithError(err).WithField("peer_id", data.blobsFrom).Warn("Downscoring peer for invalid blobs")
s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(data.blobsFrom)
s.downscorePeer(data.blobsFrom, "invalidBlobs")
} else {
log.WithError(err).WithField("peer_id", data.blocksFrom).Warn("Downscoring peer for invalid blocks")
s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(data.blocksFrom)
s.downscorePeer(data.blocksFrom, "invalidBlocks")
}
// If the error is punishable, exit here so that we don't give them credit for providing bad blocks.
return
}
@@ -492,3 +492,8 @@ func (s *Service) isProcessedBlock(ctx context.Context, blk blocks.ROBlock) bool
}
return false
}
func (s *Service) downscorePeer(peerID peer.ID, reason string) {
newScore := s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
@@ -123,7 +124,7 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
collector, err := l.retrieveCollector(topic)
if err != nil {
return err
return errors.Wrap(err, "retrieve collector")
}
remaining := collector.Remaining(remotePeer.String())
@@ -132,7 +133,7 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
amt = 1
}
if amt > uint64(remaining) {
l.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
l.downscorePeer(remotePeer, topic, "rateLimitExceeded")
writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p)
return p2ptypes.ErrRateLimited
}
@@ -140,22 +141,20 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
}
// This is used to validate all incoming rpc streams from external peers.
func (l *limiter) validateRawRpcRequest(stream network.Stream) error {
func (l *limiter) validateRawRpcRequest(stream network.Stream, amt uint64) error {
l.RLock()
defer l.RUnlock()
topic := rpcLimiterTopic
collector, err := l.retrieveCollector(topic)
remotePeer := stream.Conn().RemotePeer()
collector, err := l.retrieveCollector(rpcLimiterTopic)
if err != nil {
return err
}
key := stream.Conn().RemotePeer().String()
remaining := collector.Remaining(key)
// Treat each request as a minimum of 1.
amt := int64(1)
if amt > remaining {
l.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
if amt > uint64(remaining) {
l.downscorePeer(remotePeer, rpcLimiterTopic, "rawRateLimitExceeded")
writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p)
return p2ptypes.ErrRateLimited
}
@@ -234,3 +233,13 @@ func (l *limiter) retrieveCollector(topic string) (*leakybucket.Collector, error
func (_ *limiter) topicLogger(topic string) *logrus.Entry {
return log.WithField("rateLimiter", topic)
}
func (l *limiter) downscorePeer(peerID peer.ID, topic, reason string) {
newScore := l.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{
"peerID": peerID.String(),
"reason": reason,
"newScore": newScore,
"topic": topic,
}).Debug("Downscore peer")
}

View File

@@ -85,20 +85,20 @@ func TestRateLimiter_ExceedCapacity(t *testing.T) {
// stream, err := p1.BHost.NewStream(context.Background(), p2.PeerID(), protocol.ID(topic))
// require.NoError(t, err, "could not create stream")
// for i := 0; i < 2*defaultBurstLimit; i++ {
// err = rlimiter.validateRawRpcRequest(stream)
// rlimiter.addRawStream(stream)
// require.NoError(t, err, "could not validate incoming request")
// }
// // Triggers rate limit error on burst.
// assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream))
// for i := 0; i < 2*defaultBurstLimit; i++ {
// err = rlimiter.validateRawRpcRequest(stream, 1)
// rlimiter.addRawStream(stream)
// require.NoError(t, err, "could not validate incoming request")
// }
// // Triggers rate limit error on burst.
// assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream, 1))
// // Make Peer bad.
// for i := 0; i < defaultBurstLimit; i++ {
// assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream))
// }
// assert.NotNil(t, p1.Peers().IsBad(p2.PeerID()), "peer is not marked as a bad peer")
// require.NoError(t, stream.Close(), "could not close stream")
// // Make Peer bad.
// for i := 0; i < defaultBurstLimit; i++ {
// assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream, 1))
// }
// assert.NotNil(t, p1.Peers().IsBad(p2.PeerID()), "peer is not marked as a bad peer")
// require.NoError(t, stream.Close(), "could not close stream")
// if util.WaitTimeout(&wg, 1*time.Second) {
// t.Fatal("Did not receive stream within 1 sec")

View File

@@ -20,6 +20,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/sirupsen/logrus"
)
var (
@@ -39,7 +40,7 @@ type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error
// rpcHandlerByTopicFromFork returns the RPC handlers for a given fork index.
func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandler, error) {
// Fulu: https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#messages
// Fulu: https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#messages
if forkIndex >= version.Fulu {
return map[string]rpcHandler{
p2p.RPCStatusTopicV2: s.statusRPCHandler, // Modified in Fulu
@@ -55,7 +56,7 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
}, nil
}
// Electra: https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/p2p-interface.md#messages
// Electra: https://github.com/ethereum/consensus-specs/blob/master/specs/electra/p2p-interface.md#messages
if forkIndex >= version.Electra {
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
@@ -69,7 +70,7 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
}, nil
}
// Deneb: https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#messages
// Deneb: https://github.com/ethereum/consensus-specs/blob/master/specs/deneb/p2p-interface.md#messages
if forkIndex >= version.Deneb {
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
@@ -83,9 +84,9 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
}, nil
}
// Capella: https://github.com/ethereum/consensus-specs/blob/dev/specs/capella/p2p-interface.md#messages
// Bellatrix: https://github.com/ethereum/consensus-specs/blob/dev/specs/bellatrix/p2p-interface.md#messages
// Altair: https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/p2p-interface.md#messages
// Capella: https://github.com/ethereum/consensus-specs/blob/master/specs/capella/p2p-interface.md#messages
// Bellatrix: https://github.com/ethereum/consensus-specs/blob/master/specs/bellatrix/p2p-interface.md#messages
// Altair: https://github.com/ethereum/consensus-specs/blob/master/specs/altair/p2p-interface.md#messages
if forkIndex >= version.Altair {
handler := map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
@@ -106,7 +107,7 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
return handler, nil
}
// PhaseO: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#messages
// PhaseO: https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#messages
if forkIndex >= version.Phase0 {
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
@@ -239,7 +240,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
defer span.End()
span.SetAttributes(trace.StringAttribute("topic", topic))
span.SetAttributes(trace.StringAttribute("peer", remotePeer.String()))
log := log.WithField("peer", stream.Conn().RemotePeer().String()).WithField("topic", string(stream.Protocol()))
log := log.WithFields(logrus.Fields{"peer": remotePeer.String(), "topic": string(stream.Protocol())})
// Check before hand that peer is valid.
if err := s.cfg.p2p.Peers().IsBad(remotePeer); err != nil {
@@ -249,7 +250,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
return
}
// Validate request according to peer limits.
if err := s.rateLimiter.validateRawRpcRequest(stream); err != nil {
if err := s.rateLimiter.validateRawRpcRequest(stream, 1); err != nil {
log.WithError(err).Debug("Could not validate rpc request from peer")
return
}
@@ -305,7 +306,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
logStreamErrors(err, topic)
tracing.AnnotateError(span, err)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(remotePeer, "registerRpcError")
return
}
if err := handle(ctx, msg, stream); err != nil {
@@ -325,7 +326,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
logStreamErrors(err, topic)
tracing.AnnotateError(span, err)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(remotePeer, "registerRpcError")
return
}
if err := handle(ctx, nTyp.Elem().Interface(), stream); err != nil {

View File

@@ -15,6 +15,7 @@ import (
pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/time/slots"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@@ -43,7 +44,7 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
rp, err := validateRangeRequest(m, s.cfg.clock.CurrentSlot())
if err != nil {
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
s.downscorePeer(remotePeer, "beaconBlocksByRangeRPCHandlerValidationError")
tracing.AnnotateError(span, err)
return err
}
@@ -201,3 +202,13 @@ func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch,
return nil
}
func (s *Service) downscorePeer(peerID peer.ID, reason string, fields ...logrus.Fields) {
log := log
for _, field := range fields {
log = log.WithFields(field)
}
newScore := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}

View File

@@ -167,9 +167,11 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return errors.New("no block roots provided")
}
remotePeer := stream.Conn().RemotePeer()
currentEpoch := slots.ToEpoch(s.cfg.clock.CurrentSlot())
if uint64(len(blockRoots)) > params.MaxRequestBlock(currentEpoch) {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(remotePeer, "beaconBlocksRootRPCHandlerTooManyRoots")
s.writeErrorResponseToStream(responseCodeInvalidRequest, "requested more than the max block limit", stream)
return errors.New("requested more than the max block limit")
}

View File

@@ -71,14 +71,18 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
if !ok {
return errors.New("message is not type *pb.BlobsSidecarsByRangeRequest")
}
// TODO: Uncomment out of devnet.
// if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
// return err
// }
remotePeer := stream.Conn().RemotePeer()
rp, err := validateBlobsByRange(r, s.cfg.chain.CurrentSlot())
if err != nil {
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(remotePeer, "blobSidecarsByRangeRpcHandlerValidationError")
tracing.AnnotateError(span, err)
return err
}
@@ -88,7 +92,7 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
defer ticker.Stop()
batcher, err := newBlockRangeBatcher(rp, s.cfg.beaconDB, s.rateLimiter, s.cfg.chain.IsCanonical, ticker)
if err != nil {
log.WithError(err).Info("error in BlobSidecarsByRange batch")
log.WithError(err).Error("Cannot create new block range batcher")
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, err)
return err
@@ -119,7 +123,7 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
}
}
if err := batch.error(); err != nil {
log.WithError(err).Debug("error in BlobSidecarsByRange batch")
log.WithError(err).Debug("Error in BlobSidecarsByRange batch")
// If a rate limit is hit, it means an error response has already been sent and the stream has been closed.
if !errors.Is(err, p2ptypes.ErrRateLimited) {

View File

@@ -39,7 +39,7 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
cs := s.cfg.clock.CurrentSlot()
remotePeer := stream.Conn().RemotePeer()
if err := validateBlobByRootRequest(blobIdents, cs); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
s.downscorePeer(remotePeer, "blobSidecarsByRootRpcHandlerValidationError")
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
return err
}

View File

@@ -65,7 +65,7 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i
rangeParameters, err := validateDataColumnsByRange(request, s.cfg.chain.CurrentSlot())
if err != nil {
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
s.downscorePeer(remotePeer, "dataColumnSidecarsByRangeRpcHandlerValidationError")
tracing.AnnotateError(span, err)
return errors.Wrap(err, "validate data columns by range")
}

View File

@@ -27,7 +27,7 @@ var (
)
// dataColumnSidecarByRootRPCHandler handles the data column sidecars by root RPC request.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.dataColumnSidecarByRootRPCHandler")
defer span.End()
@@ -42,7 +42,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
}
requestedColumnIdents := ref
remotePeerId := stream.Conn().RemotePeer()
remotePeer := stream.Conn().RemotePeer()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
@@ -51,7 +51,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
// Penalize peers that send invalid requests.
if err := validateDataColumnsByRootRequest(requestedColumnIdents); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeerId)
s.downscorePeer(remotePeer, "dataColumnSidecarByRootRPCHandlerValidationError")
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
return errors.Wrap(err, "validate data columns by root request")
}
@@ -85,7 +85,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
}
log := log.WithFields(logrus.Fields{
"peer": remotePeerId,
"peer": remotePeer,
"columns": requestedColumnsByRootLog,
})

View File

@@ -13,6 +13,7 @@ import (
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@@ -35,22 +36,40 @@ var backOffTime = map[primitives.SSZUint64]time.Duration{
// goodbyeRPCHandler reads the incoming goodbye rpc message from the peer.
func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream libp2pcore.Stream) error {
const amount = 1
SetRPCStreamDeadlines(stream)
peerID := stream.Conn().RemotePeer()
m, ok := msg.(*primitives.SSZUint64)
if !ok {
return fmt.Errorf("wrong message type for goodbye, got %T, wanted *uint64", msg)
}
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
log.WithError(err).Debug("Goodbye message from rate-limited peer")
} else {
isRateLimitedPeer := false
if err := s.rateLimiter.validateRequest(stream, amount); err != nil {
if !errors.Is(err, p2ptypes.ErrRateLimited) {
return errors.Wrap(err, "validate request")
}
isRateLimitedPeer = true
}
if !isRateLimitedPeer {
s.rateLimiter.add(stream, 1)
}
log := log.WithField("Reason", goodbyeMessage(*m))
log.WithField("peer", stream.Conn().RemotePeer()).Trace("Peer has sent a goodbye message")
s.cfg.p2p.Peers().SetNextValidTime(stream.Conn().RemotePeer(), goodByeBackoff(*m))
// closes all streams with the peer
return s.cfg.p2p.Disconnect(stream.Conn().RemotePeer())
log.WithFields(logrus.Fields{
"peer": peerID,
"reason": goodbyeMessage(*m),
"isRateLimited": isRateLimitedPeer,
}).Debug("Received a goodbye message")
s.cfg.p2p.Peers().SetNextValidTime(peerID, goodByeBackoff(*m))
if err := s.cfg.p2p.Disconnect(peerID); err != nil {
return errors.Wrap(err, "disconnect")
}
return nil
}
// disconnectBadPeer checks whether peer is considered bad by some scorer, and tries to disconnect
@@ -70,7 +89,7 @@ func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID, badPeerErr
"peerID": id,
"agent": agentString(id, s.cfg.p2p.Host()),
}).
Debug("Sent peer disconnection")
Debug("Sent bad peer disconnection")
}
// A custom goodbye method that is used by our connection handler, in the

View File

@@ -26,7 +26,7 @@ func (s *Service) lightClientBootstrapRPCHandler(ctx context.Context, msg interf
SetRPCStreamDeadlines(stream)
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
logger.WithError(err).Error("s.rateLimiter.validateRequest")
logger.WithError(err).Error("Cannot validate request")
return err
}
s.rateLimiter.add(stream, 1)
@@ -42,7 +42,7 @@ func (s *Service) lightClientBootstrapRPCHandler(ctx context.Context, msg interf
if err != nil {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, err)
logger.WithError(err).Error("s.cfg.beaconDB.LightClientBootstrap")
logger.WithError(err).Error("Cannot bootstrap light client")
return err
}
if bootstrap == nil {
@@ -74,10 +74,11 @@ func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg i
defer cancel()
logger := log.WithField("handler", p2p.LightClientUpdatesByRangeName[1:])
remotePeer := stream.Conn().RemotePeer()
SetRPCStreamDeadlines(stream)
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
logger.WithError(err).Error("s.rateLimiter.validateRequest")
logger.WithError(err).Error("Cannot validate request")
return err
}
s.rateLimiter.add(stream, 1)
@@ -90,7 +91,8 @@ func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg i
if r.Count == 0 {
s.writeErrorResponseToStream(responseCodeInvalidRequest, "count is 0", stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(remotePeer, "lightClientUpdatesByRangeRPCHandlerCount0")
logger.Error("Count is 0")
return nil
}
@@ -102,7 +104,7 @@ func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg i
endPeriod, err := math.Add64(r.StartPeriod, r.Count-1)
if err != nil {
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(remotePeer, "lightClientUpdatesByRangeRPCHandlerEndPeriodOverflow")
tracing.AnnotateError(span, err)
logger.WithError(err).Error("End period overflows")
return err
@@ -114,7 +116,7 @@ func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg i
if err != nil {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, err)
logger.WithError(err).Error("s.cfg.beaconDB.LightClientUpdates")
logger.WithError(err).Error("Cannot retrieve light client updates")
return err
}
@@ -153,7 +155,7 @@ func (s *Service) lightClientFinalityUpdateRPCHandler(ctx context.Context, _ int
SetRPCStreamDeadlines(stream)
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
logger.WithError(err).Error("s.rateLimiter.validateRequest")
logger.WithError(err).Error("Cannot validate request")
return err
}
s.rateLimiter.add(stream, 1)
@@ -191,7 +193,7 @@ func (s *Service) lightClientOptimisticUpdateRPCHandler(ctx context.Context, _ i
SetRPCStreamDeadlines(stream)
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
logger.WithError(err).Error("s.rateLimiter.validateRequest")
logger.WithError(err).Error("Cannot validate request")
return err
}
s.rateLimiter.add(stream, 1)

View File

@@ -172,12 +172,12 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta
// Read the METADATA response from the peer.
code, errMsg, err := ReadStatusCode(stream, s.cfg.p2p.Encoding())
if err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
s.downscorePeer(peerID, "MetadataReadStatusCodeError")
return nil, errors.Wrap(err, "read status code")
}
if code != 0 {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
s.downscorePeer(peerID, "NonNullMetadataReadStatusCode")
return nil, errors.New(errMsg)
}
@@ -214,8 +214,8 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta
// Decode the metadata from the peer.
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return nil, err
s.downscorePeer(peerID, "MetadataDecodeError")
return nil, errors.Wrap(err, "decode with max length")
}
return msg, nil

View File

@@ -13,6 +13,7 @@ import (
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// pingHandler reads the incoming ping rpc message from the peer.
@@ -27,6 +28,7 @@ func (s *Service) pingHandler(_ context.Context, msg interface{}, stream libp2pc
if !ok {
return fmt.Errorf("wrong message type for ping, got %T, wanted *uint64", msg)
}
sequenceNumber := uint64(*m)
// Validate the incoming request regarding rate limiting.
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
@@ -39,14 +41,9 @@ func (s *Service) pingHandler(_ context.Context, msg interface{}, stream libp2pc
peerID := stream.Conn().RemotePeer()
// Check if the peer sequence number is higher than the one we have in our store.
valid, err := s.validateSequenceNum(*m, peerID)
valid, err := s.isSequenceNumberUpToDate(sequenceNumber, peerID)
if err != nil {
// Descore peer for giving us a bad sequence number.
if errors.Is(err, p2ptypes.ErrInvalidSequenceNum) {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidSequenceNum.Error(), stream)
}
s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidSequenceNum.Error(), stream)
return errors.Wrap(err, "validate sequence number")
}
@@ -141,7 +138,7 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
// If the peer responded with an error, increment the bad responses scorer.
if code != 0 {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
s.downscorePeer(peerID, "NotNullPingReadStatusCode")
return errors.Errorf("code: %d - %s", code, errMsg)
}
@@ -150,15 +147,11 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
return errors.Wrap(err, "decode sequence number")
}
sequenceNumber := uint64(*msg)
// Determine if the peer's sequence number returned by the peer is higher than the one we have in our store.
valid, err := s.validateSequenceNum(*msg, peerID)
valid, err := s.isSequenceNumberUpToDate(sequenceNumber, peerID)
if err != nil {
// Descore peer for giving us a bad sequence number.
if errors.Is(err, p2ptypes.ErrInvalidSequenceNum) {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
}
return errors.Wrap(err, "validate sequence number")
}
@@ -180,27 +173,36 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
return nil
}
// validateSequenceNum validates the peer's sequence number.
// - If the peer's sequence number is greater than the sequence number we have in our store for the peer, return false.
// - If the peer's sequence number is equal to the sequence number we have in our store for the peer, return true.
// - If the peer's sequence number is less than the sequence number we have in our store for the peer, return an error.
func (s *Service) validateSequenceNum(seq primitives.SSZUint64, id peer.ID) (bool, error) {
// isSequenceNumberUpToDate check if our internal sequence number for the peer is up to date wrt. the incoming one.
// - If the incoming sequence number is greater than the sequence number we have in our store for the peer, return false.
// - If the incoming sequence number is equal to the sequence number we have in our store for the peer, return true.
// - If the incoming sequence number is less than the sequence number we have in our store for the peer, return an error.
func (s *Service) isSequenceNumberUpToDate(incomingSequenceNumber uint64, peerID peer.ID) (bool, error) {
// Retrieve the metadata for the peer we got in our store.
md, err := s.cfg.p2p.Peers().Metadata(id)
storedMetadata, err := s.cfg.p2p.Peers().Metadata(peerID)
if err != nil {
return false, errors.Wrap(err, "get metadata")
return false, errors.Wrap(err, "peers metadata")
}
// If we have no metadata for the peer, return false.
if md == nil || md.IsNil() {
if storedMetadata == nil || storedMetadata.IsNil() {
return false, nil
}
// The peer's sequence number must be less than or equal to the sequence number we have in our store.
if md.SequenceNumber() > uint64(seq) {
storedSequenceNumber := storedMetadata.SequenceNumber()
if storedSequenceNumber > incomingSequenceNumber {
s.downscorePeer(peerID, "pingInvalidSequenceNumber", logrus.Fields{
"storedSequenceNumber": storedSequenceNumber,
"incomingSequenceNumber": incomingSequenceNumber,
})
return false, p2ptypes.ErrInvalidSequenceNum
}
// Return true if the peer's sequence number is equal to the sequence number we have in our store.
return md.SequenceNumber() == uint64(seq), nil
// If this is the case, our information about the peer is outdated.
if storedSequenceNumber < incomingSequenceNumber {
return false, nil
}
return true, nil
}

View File

@@ -37,6 +37,9 @@ func (s *Service) maintainPeerStatuses() {
wg.Add(1)
go func(id peer.ID) {
defer wg.Done()
log := log.WithField("peer", id)
// If our peer status has not been updated correctly we disconnect over here
// and set the connection state over here instead.
if s.cfg.p2p.Host().Network().Connectedness(id) != network.Connected {
@@ -45,27 +48,26 @@ func (s *Service) maintainPeerStatuses() {
log.WithError(err).Debug("Error when disconnecting with peer")
}
s.cfg.p2p.Peers().SetConnectionState(id, peers.Disconnected)
log.WithFields(logrus.Fields{
"peer": id,
"reason": "maintain peer statuses - peer is not connected",
}).Debug("Initiate peer disconnection")
log.WithField("reason", "maintainPeerStatusesNotConnectedPeer").Debug("Initiate peer disconnection")
return
}
// Disconnect from peers that are considered bad by any of the registered scorers.
if err := s.cfg.p2p.Peers().IsBad(id); err != nil {
s.disconnectBadPeer(s.ctx, id, err)
return
}
// If the status hasn't been updated in the recent interval time.
lastUpdated, err := s.cfg.p2p.Peers().ChainStateLastUpdated(id)
if err != nil {
// Peer has vanished; nothing to do.
return
}
if prysmTime.Now().After(lastUpdated.Add(interval)) {
if err := s.reValidatePeer(s.ctx, id); err != nil {
log.WithField("peer", id).WithError(err).Debug("Could not revalidate peer")
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(id)
log.WithError(err).Debug("Cannot re-validate peer")
}
}
}(pid)
@@ -130,7 +132,7 @@ func (s *Service) shouldReSync() bool {
}
// sendRPCStatusRequest for a given topic with an expected protobuf message type.
func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
func (s *Service) sendRPCStatusRequest(ctx context.Context, peer peer.ID) error {
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
@@ -157,7 +159,7 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
cp := s.cfg.chain.FinalizedCheckpt()
status := s.buildStatusFromEpoch(currentEpoch, forkDigest, cp.Root, cp.Epoch, headRoot)
stream, err := s.cfg.p2p.Send(ctx, status, topic, id)
stream, err := s.cfg.p2p.Send(ctx, status, topic, peer)
if err != nil {
return errors.Wrap(err, "p2p send")
}
@@ -166,11 +168,11 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
code, errMsg, err := ReadStatusCode(stream, s.cfg.p2p.Encoding())
if err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(peer, "statusRequestReadStatusCodeError")
return errors.Wrap(err, "read status code")
}
if code != 0 {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(id)
s.downscorePeer(peer, "statusRequestNonNullStatusCode")
return errors.New(errMsg)
}
@@ -181,9 +183,9 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
// If validation fails, validation error is logged, and peer status scorer will mark peer as bad.
err = s.validateStatusMessage(ctx, msg)
s.cfg.p2p.Peers().Scorers().PeerStatusScorer().SetPeerStatus(id, msg, err)
if err := s.cfg.p2p.Peers().IsBad(id); err != nil {
s.disconnectBadPeer(s.ctx, id, err)
s.cfg.p2p.Peers().Scorers().PeerStatusScorer().SetPeerStatus(peer, msg, err)
if err := s.cfg.p2p.Peers().IsBad(peer); err != nil {
s.disconnectBadPeer(s.ctx, peer, err)
}
return err
@@ -269,7 +271,7 @@ func (s *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
return nil
default:
respCode = responseCodeInvalidRequest
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
s.downscorePeer(remotePeer, "statusRpcHandlerInvalidMessage")
}
originalErr := err

View File

@@ -27,6 +27,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/operations/synccommittee"
"github.com/OffchainLabs/prysm/v6/beacon-chain/operations/voluntaryexits"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state/stategen"
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync/backfill/coverage"
@@ -45,6 +46,7 @@ import (
lru "github.com/hashicorp/golang-lru"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
gcache "github.com/patrickmn/go-cache"
"github.com/pkg/errors"
@@ -285,19 +287,32 @@ func (s *Service) Start() {
// Stop the regular sync service.
func (s *Service) Stop() error {
defer func() {
s.cancel()
if s.rateLimiter != nil {
s.rateLimiter.free()
}
}()
// Say goodbye to all peers.
for _, peerID := range s.cfg.p2p.Peers().Connected() {
if s.cfg.p2p.Host().Network().Connectedness(peerID) == network.Connected {
if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeClientShutdown, peerID); err != nil {
log.WithError(err).WithField("peerID", peerID).Error("Failed to send goodbye message")
}
}
}
// Removing RPC Stream handlers.
for _, p := range s.cfg.p2p.Host().Mux().Protocols() {
s.cfg.p2p.Host().RemoveStreamHandler(p)
}
// Deregister Topic Subscribers.
for _, t := range s.cfg.p2p.PubSub().GetTopics() {
s.unSubscribeFromTopic(t)
}
defer s.cancel()
return nil
}

View File

@@ -26,7 +26,7 @@ import (
"github.com/sirupsen/logrus"
)
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
const dataColumnSidecarSubTopic = "/data_column_sidecar_%d/"
@@ -89,7 +89,7 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
verifier := s.newColumnsVerifier(roDataColumns, verification.GossipDataColumnSidecarRequirements)
// Start the verification process.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
// [REJECT] The sidecar is valid as verified by `verify_data_column_sidecar(sidecar)`.
if err := verifier.ValidFields(); err != nil {

View File

@@ -22,7 +22,7 @@ import (
var (
// GossipDataColumnSidecarRequirements defines the set of requirements that DataColumnSidecars received on gossip
// must satisfy in order to upgrade an RODataColumn to a VerifiedRODataColumn.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#data_column_sidecar_subnet_id
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#data_column_sidecar_subnet_id
GossipDataColumnSidecarRequirements = []Requirement{
RequireValidFields,
RequireCorrectSubnet,
@@ -49,7 +49,7 @@ var (
// ByRangeRequestDataColumnSidecarRequirements defines the set of requirements that DataColumnSidecars received
// via the by range request must satisfy in order to upgrade an RODataColumn to a VerifiedRODataColumn.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
ByRangeRequestDataColumnSidecarRequirements = []Requirement{
RequireValidFields,
RequireSidecarInclusionProven,

View File

@@ -0,0 +1,7 @@
### Removed
- Partially reverting pr #15390 removing the `ssz-only` debug flag until there is a real usecase for the flag
### Added
- Added new PRYSM_API_OVERRIDE_ACCEPT environment variable to override ssz accept header as a replacement to flag

View File

@@ -0,0 +1,3 @@
### Changed
- Update links to consensus-specs to point to `master` branch

View File

@@ -0,0 +1,2 @@
### Fixed
- Fixed various reasons why a node is banned by its peers when it stops.

View File

@@ -51,7 +51,6 @@ type Flags struct {
EnableExperimentalAttestationPool bool // EnableExperimentalAttestationPool enables an experimental attestation pool design.
DisableDutiesV2 bool // DisableDutiesV2 sets validator client to use the get Duties endpoint
EnableWeb bool // EnableWeb enables the webui on the validator client
SSZOnly bool // SSZOnly forces the validator client to use SSZ for communication with the beacon node when REST mode is enabled (useful for debugging)
// Logging related toggles.
DisableGRPCConnectionLogs bool // Disables logging when a new grpc client has connected.
EnableFullSSZDataLogging bool // Enables logging for full ssz data on rejected gossip messages
@@ -356,10 +355,6 @@ func ConfigureValidator(ctx *cli.Context) error {
logEnabled(EnableWebFlag)
cfg.EnableWeb = true
}
if ctx.Bool(SSZOnly.Name) {
logEnabled(SSZOnly)
cfg.SSZOnly = true
}
cfg.KeystoreImportDebounceInterval = ctx.Duration(dynamicKeyReloadDebounceInterval.Name)
Init(cfg)

View File

@@ -211,12 +211,6 @@ var (
Usage: "(Work in progress): Enables the web portal for the validator client.",
Value: false,
}
// SSZOnly forces the validator client to use SSZ for communication with the beacon node when REST mode is enabled
SSZOnly = &cli.BoolFlag{
Name: "ssz-only",
Usage: "(debug): Forces the validator client to use SSZ for communication with the beacon node when REST mode is enabled",
}
)
// devModeFlags holds list of flags that are set when development mode is on.
@@ -239,7 +233,6 @@ var ValidatorFlags = append(deprecatedFlags, []cli.Flag{
EnableBeaconRESTApi,
DisableDutiesV2,
EnableWebFlag,
SSZOnly,
}...)
// E2EValidatorFlags contains a list of the validator feature flags to be tested in E2E.

View File

@@ -4,6 +4,10 @@ import (
"testing"
)
const (
EnvNameOverrideAccept = "PRYSM_API_OVERRIDE_ACCEPT"
)
// SetupTestConfigCleanup preserves configurations allowing to modify them within tests without any
// restrictions, everything is restored after the test.
func SetupTestConfigCleanup(t testing.TB) {

View File

@@ -27,7 +27,7 @@ type VersionedUnmarshaler struct {
// Fork aligns with the fork names in config/params/values.go
Fork int
// Version corresponds to the Version type defined in the beacon-chain spec, aka a "fork version number":
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#custom-types
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#custom-types
Version [fieldparams.VersionLength]byte
}

View File

@@ -73,7 +73,7 @@ message BeaconBlockBody {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4

View File

@@ -171,7 +171,7 @@ message BeaconBlockBody {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4
@@ -402,7 +402,7 @@ message BeaconBlockBodyAltair {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4
@@ -487,7 +487,7 @@ message BeaconBlockBodyBellatrix {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4
@@ -629,7 +629,7 @@ message BeaconBlockBodyCapella {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4
@@ -813,7 +813,7 @@ message BeaconBlockBodyDeneb {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/core/0_beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/core/0_beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4
@@ -1040,7 +1040,7 @@ message BeaconBlockBodyElectra {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/core/0_beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/core/0_beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4

View File

@@ -100,7 +100,7 @@ service BeaconChain {
// that it was included in a block. The attestation may have expired.
// Refer to the Ethereum Beacon Chain specification for more details on how
// attestations are processed and when they are no longer valid.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#attestations
rpc AttestationPool(AttestationPoolRequest)
returns (AttestationPoolResponse) {
option deprecated = true;
@@ -117,7 +117,7 @@ service BeaconChain {
// that it was included in a block. The attestation may have expired.
// Refer to the Ethereum Beacon Chain specification for more details on how
// attestations are processed and when they are no longer valid.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#attestations
rpc AttestationPoolElectra(AttestationPoolRequest)
returns (AttestationPoolElectraResponse) {
option deprecated = true;

View File

@@ -191,7 +191,7 @@ message PowBlock {
// The beacon state for Altair hard fork 1.
// Reference:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/beacon-chain.md#beaconstate
// https://github.com/ethereum/consensus-specs/blob/master/specs/altair/beacon-chain.md#beaconstate
message BeaconStateAltair {
// Versioning [1001-2000]
uint64 genesis_time = 1001;
@@ -747,4 +747,4 @@ message BeaconStateFulu {
// Fields introduced in Fulu fork [13001-14000]
repeated uint64 proposer_lookahead = 13001
[ (ethereum.eth.ext.ssz_size) = "proposer_lookahead_size" ];
}
}

View File

@@ -248,9 +248,6 @@ func (v *ValidatorNode) Start(ctx context.Context) error {
args = append(args,
fmt.Sprintf("--%s=http://localhost:%d", flags.BeaconRESTApiProviderFlag.Name, beaconRestApiPort),
fmt.Sprintf("--%s", features.EnableBeaconRESTApi.Name))
if v.config.UseSSZOnly {
args = append(args, fmt.Sprintf("--%s", features.SSZOnly.Name))
}
}
// Only apply e2e flags to the current branch. New flags may not exist in previous release.

View File

@@ -25,14 +25,14 @@ func TestEndToEnd_MinimalConfig_Web3Signer_PersistentKeys(t *testing.T) {
e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithRemoteSignerAndPersistentKeysFile()).run()
}
func TestEndToEnd_MinimalConfig_ValidatorRESTApi(t *testing.T) {
e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithValidatorRESTApi()).run()
}
func TestEndToEnd_MinimalConfig_ValidatorRESTApi_SSZ(t *testing.T) {
e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithValidatorRESTApi(), types.WithSSZOnly()).run()
}
func TestEndToEnd_MinimalConfig_ValidatorRESTApi(t *testing.T) {
e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithValidatorRESTApi()).run()
}
func TestEndToEnd_ScenarioRun_EEOffline(t *testing.T) {
t.Skip("TODO(#10242) Prysm is current unable to handle an offline e2e")
cfg := types.InitForkCfg(version.Bellatrix, version.Deneb, params.E2ETestConfig())

View File

@@ -11,9 +11,11 @@ go_library(
importpath = "github.com/OffchainLabs/prysm/v6/testing/endtoend/types",
visibility = ["//testing/endtoend:__subpackages__"],
deps = [
"//api:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//runtime/version:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)

View File

@@ -6,9 +6,11 @@ import (
"context"
"os"
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
@@ -51,18 +53,20 @@ func WithValidatorRESTApi() E2EConfigOpt {
}
}
func WithSSZOnly() E2EConfigOpt {
return func(cfg *E2EConfig) {
cfg.UseSSZOnly = true
}
}
func WithBuilder() E2EConfigOpt {
return func(cfg *E2EConfig) {
cfg.UseBuilder = true
}
}
func WithSSZOnly() E2EConfigOpt {
return func(cfg *E2EConfig) {
if err := os.Setenv(params.EnvNameOverrideAccept, api.OctetStreamMediaType); err != nil {
logrus.Fatal(err)
}
}
}
// E2EConfig defines the struct for all configurations needed for E2E testing.
type E2EConfig struct {
TestCheckpointSync bool
@@ -76,7 +80,6 @@ type E2EConfig struct {
UseFixedPeerIDs bool
UseValidatorCrossClient bool
UseBeaconRestApi bool
UseSSZOnly bool
UseBuilder bool
EpochsToRun uint64
Seed int64

View File

@@ -46,7 +46,6 @@ go_library(
"//api/server/structs:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
@@ -128,7 +127,6 @@ go_test(
"//api/server/structs:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/rpc/eth/shared/testing:go_default_library",
"//config/features:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/validator:go_default_library",

View File

@@ -9,7 +9,6 @@ import (
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/api/server/structs"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/assert"
@@ -215,11 +214,6 @@ func TestGetBeaconBlock_SSZ_BellatrixValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBellatrixBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -262,11 +256,6 @@ func TestGetBeaconBlock_SSZ_BlindedBellatrixValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBlindedBellatrixBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -309,11 +298,6 @@ func TestGetBeaconBlock_SSZ_CapellaValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoCapellaBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -356,11 +340,6 @@ func TestGetBeaconBlock_SSZ_BlindedCapellaValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBlindedCapellaBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -403,11 +382,6 @@ func TestGetBeaconBlock_SSZ_DenebValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoDenebBeaconBlockContents()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -450,11 +424,6 @@ func TestGetBeaconBlock_SSZ_BlindedDenebValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBlindedDenebBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -497,11 +466,6 @@ func TestGetBeaconBlock_SSZ_ElectraValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoElectraBeaconBlockContents()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -544,11 +508,6 @@ func TestGetBeaconBlock_SSZ_BlindedElectraValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBlindedElectraBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -591,11 +550,6 @@ func TestGetBeaconBlock_SSZ_UnsupportedVersion(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
const slot = primitives.Slot(1)
randaoReveal := []byte{2}
graffiti := []byte{3}
@@ -625,11 +579,6 @@ func TestGetBeaconBlock_SSZ_InvalidBlindedHeader(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBellatrixBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -663,11 +612,6 @@ func TestGetBeaconBlock_SSZ_InvalidVersionHeader(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBellatrixBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -701,11 +645,6 @@ func TestGetBeaconBlock_SSZ_GetSSZError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
const slot = primitives.Slot(1)
randaoReveal := []byte{2}
graffiti := []byte{3}
@@ -731,11 +670,6 @@ func TestGetBeaconBlock_SSZ_Phase0Valid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoPhase0BeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -778,11 +712,6 @@ func TestGetBeaconBlock_SSZ_AltairValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoAltairBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)

View File

@@ -7,15 +7,19 @@ import (
"fmt"
"io"
"net/http"
"os"
"strings"
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/api/apiutil"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/network/httputil"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type reqOption func(*http.Request)
type RestHandler interface {
Get(ctx context.Context, endpoint string, resp interface{}) error
GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error)
@@ -26,16 +30,30 @@ type RestHandler interface {
}
type BeaconApiRestHandler struct {
client http.Client
host string
client http.Client
host string
reqOverrides []reqOption
}
// NewBeaconApiRestHandler returns a RestHandler
func NewBeaconApiRestHandler(client http.Client, host string) RestHandler {
return &BeaconApiRestHandler{
brh := &BeaconApiRestHandler{
client: client,
host: host,
}
brh.appendAcceptOverride()
return brh
}
// appendAcceptOverride enables the Accept header to be customized at runtime via an environment variable.
// This is specified as an env var because it is a niche option that prysm may use for performance testing or debugging
// bug which users are unlikely to need. Using an env var keeps the set of user-facing flags cleaner.
func (c *BeaconApiRestHandler) appendAcceptOverride() {
if accept := os.Getenv(params.EnvNameOverrideAccept); accept != "" {
c.reqOverrides = append(c.reqOverrides, func(req *http.Request) {
req.Header.Set("Accept", accept)
})
}
}
// HttpClient returns the underlying HTTP client of the handler
@@ -56,7 +74,6 @@ func (c *BeaconApiRestHandler) Get(ctx context.Context, endpoint string, resp in
if err != nil {
return errors.Wrapf(err, "failed to create request for endpoint %s", url)
}
httpResp, err := c.client.Do(req)
if err != nil {
return errors.Wrapf(err, "failed to perform request for endpoint %s", url)
@@ -76,13 +93,16 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to create request for endpoint %s", url)
}
primaryAcceptType := fmt.Sprintf("%s;q=%s", api.OctetStreamMediaType, "0.95")
secondaryAcceptType := fmt.Sprintf("%s;q=%s", api.JsonMediaType, "0.9")
acceptHeaderString := fmt.Sprintf("%s,%s", primaryAcceptType, secondaryAcceptType)
if features.Get().SSZOnly {
acceptHeaderString = api.OctetStreamMediaType
}
req.Header.Set("Accept", acceptHeaderString)
for _, o := range c.reqOverrides {
o(req)
}
httpResp, err := c.client.Do(req)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to perform request for endpoint %s", url)
@@ -92,16 +112,17 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
return
}
}()
accept := req.Header.Get("Accept")
contentType := httpResp.Header.Get("Content-Type")
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to read response body for %s", httpResp.Request.URL)
}
if !strings.Contains(primaryAcceptType, contentType) {
if !apiutil.PrimaryAcceptMatches(accept, contentType) {
log.WithFields(logrus.Fields{
"primaryAcceptType": primaryAcceptType,
"secondaryAcceptType": secondaryAcceptType,
"receivedAcceptType": contentType,
"Accept": accept,
"Content-Type": contentType,
}).Debug("Server responded with non primary accept type")
}
@@ -115,10 +136,6 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
return nil, nil, errorJson
}
if features.Get().SSZOnly && contentType != api.OctetStreamMediaType {
return nil, nil, errors.Errorf("server responded with non primary accept type %s", contentType)
}
return body, httpResp.Header, nil
}

View File

@@ -7,11 +7,13 @@ import (
"io"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/api/server/structs"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/network/httputil"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
@@ -143,6 +145,25 @@ func TestGetSSZ(t *testing.T) {
})
}
func TestAcceptOverrideSSZ(t *testing.T) {
name := "TestAcceptOverride"
orig := os.Getenv(params.EnvNameOverrideAccept)
defer func() {
require.NoError(t, os.Setenv(params.EnvNameOverrideAccept, orig))
}()
require.NoError(t, os.Setenv(params.EnvNameOverrideAccept, name))
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, name, r.Header.Get("Accept"))
w.WriteHeader(200)
_, err := w.Write([]byte("ok"))
require.NoError(t, err)
}))
defer srv.Close()
c := NewBeaconApiRestHandler(http.Client{Timeout: time.Second * 5}, srv.URL)
_, _, err := c.GetSSZ(t.Context(), "/test")
require.NoError(t, err)
}
func TestPost(t *testing.T) {
ctx := t.Context()
const endpoint = "/example/rest/api/endpoint"