Compare commits

...

4 Commits

Author SHA1 Message Date
terence tsao
8c3375d87c Beacon api: fix proposer duty computation for fulu 2025-07-25 12:18:12 -07:00
Justin Traglia
856742ff68 Update links to consensus-specs to point to master branch (#15523)
* Update links to consensus-specs to point to master branch

* Add changelog fragment
2025-07-23 08:32:09 +00:00
Manu NALEPA
abe16a9cb4 Fix downscore by peers when a node gracefully stops. (#15505)
* Log when downscoring a peer.

* `validateSequenceNumber`: Downscore peer in function, clarify and add logs

* `AddConnectionHandler`: Send majority code to the outer scope (no funtional change).

* `disconnectBadPeer`: Improve log.

* `sendRPCStatusRequest`: Improve log.

* `findPeersWithSubnets`: Add preventive peer filtering.
(As done in `s.findPeers`.)

* `Stop`: Use one `defer` for the whole function.
Reminder: `defer`s are executed backwards.

* `Stop`: Send a goodbye message to all connected peers when stopping the service.

Before this commit, stopping the service did not send any goodbye message to all connected peers. The issue with this approach is that the peer still thinks we are alive, and behaves so by trying to communicate with us. Unfortunatly, because we are offline, we cannot respond. Because of that, the peer starts to downscore us, and then bans us. As a consequence, when we restart, the peer refuses our connection request.

By sending a goodbye message when stopping the service, we ensure the peer stops to expect anything from us. When restarting, everything is allright.

* `ConnectedF` and `DisconnectedF`: Workaround very probable libp2p bug by preventing outbound connection to very recently disconnected peers.

* Fix James' comment.

* Fix James' comment.

* Fix James' comment.

* Fix James' comment.

* Fix James' comment.

* `AddDisconnectionHandler`: Handle multiple close calls to `DisconnectedF` for the same peer.
2025-07-22 20:15:18 +00:00
james-prysm
77958022e7 removing ssz-only flag ( reverting feature) and fix accept header middleware (#15433)
* removing ssz-only flag

* gaz

* reverting other uses of sszonly

* gaz

* adding kasey and radek's suggestions

* update changelog

* adding test

* radek advice with new headers and tests

* adding logs and fixing comments

* adding logs and fixing comments

* gaz

* Update validator/client/beacon-api/rest_handler_client.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update api/apiutil/header.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update api/apiutil/header.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* radek's comments

* adding another failing case based on radek's suggestion

* another unit test

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2025-07-22 16:06:51 +00:00
63 changed files with 834 additions and 362 deletions

View File

@@ -2,18 +2,28 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["common.go"],
srcs = [
"common.go",
"header.go",
],
importpath = "github.com/OffchainLabs/prysm/v6/api/apiutil",
visibility = ["//visibility:public"],
deps = ["//consensus-types/primitives:go_default_library"],
deps = [
"//consensus-types/primitives:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["common_test.go"],
srcs = [
"common_test.go",
"header_test.go",
],
embed = [":go_default_library"],
deps = [
"//consensus-types/primitives:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
],
)

122
api/apiutil/header.go Normal file
View File

@@ -0,0 +1,122 @@
package apiutil
import (
"mime"
"sort"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
)
type mediaRange struct {
mt string // canonicalised mediatype, e.g. "application/json"
q float64 // quality factor (01)
raw string // original string useful for logging/debugging
spec int // 2=exact, 1=type/*, 0=*/*
}
func parseMediaRange(field string) (mediaRange, bool) {
field = strings.TrimSpace(field)
mt, params, err := mime.ParseMediaType(field)
if err != nil {
log.WithError(err).Debug("Failed to parse header field")
return mediaRange{}, false
}
r := mediaRange{mt: mt, q: 1, spec: 2, raw: field}
if qs, ok := params["q"]; ok {
v, err := strconv.ParseFloat(qs, 64)
if err != nil || v < 0 || v > 1 {
log.WithField("q", qs).Debug("Invalid quality factor (01)")
return mediaRange{}, false // skip invalid entry
}
r.q = v
}
switch {
case mt == "*/*":
r.spec = 0
case strings.HasSuffix(mt, "/*"):
r.spec = 1
}
return r, true
}
func hasExplicitQ(r mediaRange) bool {
return strings.Contains(strings.ToLower(r.raw), ";q=")
}
// ParseAccept returns media ranges sorted by q (desc) then specificity.
func ParseAccept(header string) []mediaRange {
if header == "" {
return []mediaRange{{mt: "*/*", q: 1, spec: 0, raw: "*/*"}}
}
var out []mediaRange
for _, field := range strings.Split(header, ",") {
if r, ok := parseMediaRange(field); ok {
out = append(out, r)
}
}
sort.SliceStable(out, func(i, j int) bool {
ei, ej := hasExplicitQ(out[i]), hasExplicitQ(out[j])
if ei != ej {
return ei // explicit beats implicit
}
if out[i].q != out[j].q {
return out[i].q > out[j].q
}
return out[i].spec > out[j].spec
})
return out
}
// Matches reports whether content type is acceptable per the header.
func Matches(header, ct string) bool {
for _, r := range ParseAccept(header) {
switch {
case r.q == 0:
continue
case r.mt == "*/*":
return true
case strings.HasSuffix(r.mt, "/*"):
if strings.HasPrefix(ct, r.mt[:len(r.mt)-1]) {
return true
}
case r.mt == ct:
return true
}
}
return false
}
// Negotiate selects the best server type according to the header.
// Returns the chosen type and true, or "", false when nothing matches.
func Negotiate(header string, serverTypes []string) (string, bool) {
for _, r := range ParseAccept(header) {
if r.q == 0 {
continue
}
for _, s := range serverTypes {
if Matches(r.mt, s) {
return s, true
}
}
}
return "", false
}
// PrimaryAcceptMatches only checks if the first accept matches
func PrimaryAcceptMatches(header, produced string) bool {
for _, r := range ParseAccept(header) {
if r.q == 0 {
continue // explicitly unacceptable skip
}
return Matches(r.mt, produced)
}
return false
}

174
api/apiutil/header_test.go Normal file
View File

@@ -0,0 +1,174 @@
package apiutil
import (
"testing"
"github.com/OffchainLabs/prysm/v6/testing/require"
)
func TestParseAccept(t *testing.T) {
type want struct {
mt string
q float64
spec int
}
cases := []struct {
name string
header string
want []want
}{
{
name: "empty header becomes */*;q=1",
header: "",
want: []want{{mt: "*/*", q: 1, spec: 0}},
},
{
name: "quality ordering then specificity",
header: "application/json;q=0.2, */*;q=0.1, application/xml;q=0.5, text/*;q=0.5",
want: []want{
{mt: "application/xml", q: 0.5, spec: 2},
{mt: "text/*", q: 0.5, spec: 1},
{mt: "application/json", q: 0.2, spec: 2},
{mt: "*/*", q: 0.1, spec: 0},
},
},
{
name: "invalid pieces are skipped",
header: "text/plain; q=boom, application/json",
want: []want{{mt: "application/json", q: 1, spec: 2}},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := ParseAccept(tc.header)
gotProjected := make([]want, len(got))
for i, g := range got {
gotProjected[i] = want{mt: g.mt, q: g.q, spec: g.spec}
}
require.DeepEqual(t, gotProjected, tc.want)
})
}
}
func TestMatches(t *testing.T) {
cases := []struct {
name string
accept string
ct string
matches bool
}{
{"exact match", "application/json", "application/json", true},
{"type wildcard", "application/*;q=0.8", "application/xml", true},
{"global wildcard", "*/*;q=0.1", "image/png", true},
{"explicitly unacceptable (q=0)", "text/*;q=0", "text/plain", false},
{"no match", "image/png", "application/json", false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := Matches(tc.accept, tc.ct)
require.Equal(t, tc.matches, got)
})
}
}
func TestNegotiate(t *testing.T) {
cases := []struct {
name string
accept string
serverTypes []string
wantType string
ok bool
}{
{
name: "highest quality wins",
accept: "application/json;q=0.8,application/xml;q=0.9",
serverTypes: []string{"application/json", "application/xml"},
wantType: "application/xml",
ok: true,
},
{
name: "wildcard matches first server type",
accept: "*/*;q=0.5",
serverTypes: []string{"application/octet-stream", "application/json"},
wantType: "application/octet-stream",
ok: true,
},
{
name: "no acceptable type",
accept: "image/png",
serverTypes: []string{"application/json"},
wantType: "",
ok: false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got, ok := Negotiate(tc.accept, tc.serverTypes)
require.Equal(t, tc.ok, ok)
require.Equal(t, tc.wantType, got)
})
}
}
func TestPrimaryAcceptMatches(t *testing.T) {
tests := []struct {
name string
accept string
produced string
expect bool
}{
{
name: "prefers json",
accept: "application/json;q=0.9,application/xml",
produced: "application/json",
expect: true,
},
{
name: "wildcard application beats other wildcard",
accept: "application/*;q=0.2,*/*;q=0.1",
produced: "application/xml",
expect: true,
},
{
name: "json wins",
accept: "application/xml;q=0.8,application/json;q=0.9",
produced: "application/json",
expect: true,
},
{
name: "json loses",
accept: "application/xml;q=0.8,application/json;q=0.9,application/octet-stream;q=0.99",
produced: "application/json",
expect: false,
},
{
name: "json wins with non q option",
accept: "application/xml;q=0.8,image/png,application/json;q=0.9",
produced: "application/json",
expect: true,
},
{
name: "json not primary",
accept: "image/png,application/json",
produced: "application/json",
expect: false,
},
{
name: "absent header",
accept: "",
produced: "text/plain",
expect: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := PrimaryAcceptMatches(tc.accept, tc.produced)
require.Equal(t, got, tc.expect)
})
}
}

View File

@@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"//api/apiutil:go_default_library",
"@com_github_rs_cors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],

View File

@@ -7,6 +7,7 @@ import (
"strings"
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/api/apiutil"
"github.com/rs/cors"
log "github.com/sirupsen/logrus"
)
@@ -74,42 +75,10 @@ func ContentTypeHandler(acceptedMediaTypes []string) Middleware {
func AcceptHeaderHandler(serverAcceptedTypes []string) Middleware {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
acceptHeader := r.Header.Get("Accept")
// header is optional and should skip if not provided
if acceptHeader == "" {
next.ServeHTTP(w, r)
if _, ok := apiutil.Negotiate(r.Header.Get("Accept"), serverAcceptedTypes); !ok {
http.Error(w, "Not Acceptable", http.StatusNotAcceptable)
return
}
accepted := false
acceptTypes := strings.Split(acceptHeader, ",")
// follows rules defined in https://datatracker.ietf.org/doc/html/rfc2616#section-14.1
for _, acceptType := range acceptTypes {
acceptType = strings.TrimSpace(acceptType)
if acceptType == "*/*" {
accepted = true
break
}
for _, serverAcceptedType := range serverAcceptedTypes {
if strings.HasPrefix(acceptType, serverAcceptedType) {
accepted = true
break
}
if acceptType != "/*" && strings.HasSuffix(acceptType, "/*") && strings.HasPrefix(serverAcceptedType, acceptType[:len(acceptType)-2]) {
accepted = true
break
}
}
if accepted {
break
}
}
if !accepted {
http.Error(w, fmt.Sprintf("Not Acceptable: %s", acceptHeader), http.StatusNotAcceptable)
return
}
next.ServeHTTP(w, r)
})
}

View File

@@ -11,7 +11,7 @@ import (
)
var (
// https://github.com/ethereum/consensus-specs/blob/dev/presets/mainnet/trusted_setups/trusted_setup_4096.json
// https://github.com/ethereum/consensus-specs/blob/master/presets/mainnet/trusted_setups/trusted_setup_4096.json
//go:embed trusted_setup_4096.json
embeddedTrustedSetup []byte // 1.2Mb
kzgContext *GoKZG.Context

View File

@@ -177,7 +177,7 @@ func (s *Service) processAttestations(ctx context.Context, disparity time.Durati
for _, a := range atts {
// Based on the spec, don't process the attestation until the subsequent slot.
// This delays consideration in the fork choice until their slot is in the past.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/fork-choice.md#validate_on_attestation
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/fork-choice.md#validate_on_attestation
nextSlot := a.GetData().Slot + 1
if err := slots.VerifyTime(s.genesisTime, nextSlot, disparity); err != nil {
continue

View File

@@ -206,9 +206,9 @@ func ParseWeakSubjectivityInputString(wsCheckpointString string) (*v1alpha1.Chec
// MinEpochsForBlockRequests computes the number of epochs of block history that we need to maintain,
// relative to the current epoch, per the p2p specs. This is used to compute the slot where backfill is complete.
// value defined:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#configuration
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#configuration
// MIN_VALIDATOR_WITHDRAWABILITY_DELAY + CHURN_LIMIT_QUOTIENT // 2 (= 33024, ~5 months)
// detailed rationale: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#why-are-blocksbyrange-requests-only-required-to-be-served-for-the-latest-min_epochs_for_block_requests-epochs
// detailed rationale: https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#why-are-blocksbyrange-requests-only-required-to-be-served-for-the-latest-min_epochs_for_block_requests-epochs
func MinEpochsForBlockRequests() primitives.Epoch {
return params.BeaconConfig().MinValidatorWithdrawabilityDelay +
primitives.Epoch(params.BeaconConfig().ChurnLimitQuotient/2)

View File

@@ -292,7 +292,7 @@ func TestMinEpochsForBlockRequests(t *testing.T) {
params.SetActiveTestCleanup(t, params.MainnetConfig())
var expected primitives.Epoch = 33024
// expected value of 33024 via spec commentary:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#why-are-blocksbyrange-requests-only-required-to-be-served-for-the-latest-min_epochs_for_block_requests-epochs
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#why-are-blocksbyrange-requests-only-required-to-be-served-for-the-latest-min_epochs_for_block_requests-epochs
// MIN_EPOCHS_FOR_BLOCK_REQUESTS is calculated using the arithmetic from compute_weak_subjectivity_period found in the weak subjectivity guide. Specifically to find this max epoch range, we use the worst case event of a very large validator size (>= MIN_PER_EPOCH_CHURN_LIMIT * CHURN_LIMIT_QUOTIENT).
//
// MIN_EPOCHS_FOR_BLOCK_REQUESTS = (

View File

@@ -126,7 +126,7 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, curre
return errors.Wrap(err, "entry filter")
}
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
verifier := s.newDataColumnsVerifier(roDataColumns, verification.ByRangeRequestDataColumnSidecarRequirements)
if err := verifier.ValidFields(); err != nil {

View File

@@ -75,7 +75,7 @@ data-columns
Computation of the maximum size of a DataColumnSidecar
------------------------------------------------------
https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/das-core.md#datacolumnsidecar
https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#datacolumnsidecar
class DataColumnSidecar(Container):

View File

@@ -104,6 +104,7 @@ go_library(
"@com_github_libp2p_go_mplex//:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_multiformats_go_multiaddr//net:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",

View File

@@ -1,7 +1,7 @@
/*
Package p2p implements the Ethereum consensus networking specification.
Canonical spec reference: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md
Canonical spec reference: https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md
Prysm specific implementation design docs
- Networking Design Doc: https://docs.google.com/document/d/1VyhobQRkEjEkEPxmmdWvaHfKWn0j6dEae_wLZlrFtfU/view

View File

@@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@@ -79,8 +80,8 @@ func (s *Service) disconnectFromPeerOnError(
// and validating the response from the peer.
func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Context, id peer.ID) error) {
// Peer map and lock to keep track of current connection attempts.
var peerLock sync.Mutex
peerMap := make(map[peer.ID]bool)
peerLock := new(sync.Mutex)
// This is run at the start of each connection attempt, to ensure
// that there aren't multiple inflight connection requests for the
@@ -108,6 +109,19 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
s.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(_ network.Network, conn network.Conn) {
remotePeer := conn.RemotePeer()
log := log.WithField("peer", remotePeer)
direction := conn.Stat().Direction
// For some reason, right after a disconnection, this `ConnectedF` callback
// is called. We want to avoid processing this connection if the peer was
// disconnected too recently and if we are at the initiative of this connection.
// This is very probably a bug in libp2p.
if direction == network.DirOutbound {
if err := s.wasDisconnectedTooRecently(remotePeer); err != nil {
log.WithError(err).Debug("Skipping connection handler")
return
}
}
// Connection handler must be non-blocking as part of libp2p design.
go func() {
@@ -133,53 +147,56 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
return
}
// Do not perform handshake on inbound dials.
if conn.Stat().Direction == network.DirInbound {
_, err := s.peers.ChainState(remotePeer)
peerExists := err == nil
currentTime := prysmTime.Now()
// Wait for peer to initiate handshake
time.Sleep(timeForStatus)
// Exit if we are disconnected with the peer.
if s.host.Network().Connectedness(remotePeer) != network.Connected {
if direction != network.DirInbound {
s.peers.SetConnectionState(conn.RemotePeer(), peers.Connecting)
if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && !errors.Is(err, io.EOF) {
s.disconnectFromPeerOnError(conn, goodByeFunc, err)
return
}
// If peer hasn't sent a status request, we disconnect with them
if _, err := s.peers.ChainState(remotePeer); errors.Is(err, peerdata.ErrPeerUnknown) || errors.Is(err, peerdata.ErrNoPeerStatus) {
statusMessageMissing.Inc()
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state"))
return
}
if peerExists {
updated, err := s.peers.ChainStateLastUpdated(remotePeer)
if err != nil {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state last updated"))
return
}
// Exit if we don't receive any current status messages from peer.
if updated.IsZero() {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("is zero"))
return
}
if updated.Before(currentTime) {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("did not update"))
return
}
}
s.connectToPeer(conn)
return
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.Connecting)
if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && !errors.Is(err, io.EOF) {
s.disconnectFromPeerOnError(conn, goodByeFunc, err)
// The connection is inbound.
_, err = s.peers.ChainState(remotePeer)
peerExists := err == nil
currentTime := prysmTime.Now()
// Wait for peer to initiate handshake
time.Sleep(timeForStatus)
// Exit if we are disconnected with the peer.
if s.host.Network().Connectedness(remotePeer) != network.Connected {
return
}
// If peer hasn't sent a status request, we disconnect with them
if _, err := s.peers.ChainState(remotePeer); errors.Is(err, peerdata.ErrPeerUnknown) || errors.Is(err, peerdata.ErrNoPeerStatus) {
statusMessageMissing.Inc()
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state"))
return
}
if !peerExists {
s.connectToPeer(conn)
return
}
updated, err := s.peers.ChainStateLastUpdated(remotePeer)
if err != nil {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state last updated"))
return
}
// Exit if we don't receive any current status messages from peer.
if updated.IsZero() {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("is zero"))
return
}
if updated.Before(currentTime) {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("did not update"))
return
}
@@ -220,6 +237,12 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
}
s.peers.SetConnectionState(peerID, peers.Disconnected)
if err := s.peerDisconnectionTime.Add(peerID.String(), time.Now(), cache.DefaultExpiration); err != nil {
// The `DisconnectedF` funcition already called for this peer less than `cache.DefaultExpiration` ago. Skip.
// (Very probably a bug in libp2p.)
log.WithError(err).Trace("Failed to set peer disconnection time")
return
}
// Only log disconnections if we were fully connected.
if priorState == peers.Connected {
@@ -231,6 +254,28 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
})
}
// wasDisconnectedTooRecently checks if the peer was disconnected within the last second.
func (s *Service) wasDisconnectedTooRecently(peerID peer.ID) error {
const disconnectionDurationThreshold = 1 * time.Second
peerDisconnectionTimeObj, ok := s.peerDisconnectionTime.Get(peerID.String())
if !ok {
return nil
}
peerDisconnectionTime, ok := peerDisconnectionTimeObj.(time.Time)
if !ok {
return errors.New("invalid peer disconnection time type")
}
timeSinceDisconnection := time.Since(peerDisconnectionTime)
if timeSinceDisconnection < disconnectionDurationThreshold {
return errors.Errorf("peer %s was disconnected too recently: %s", peerID, timeSinceDisconnection)
}
return nil
}
func agentString(pid peer.ID, hst host.Host) string {
rawVersion, storeErr := hst.Peerstore().Get(pid, agentVersionKey)

View File

@@ -101,21 +101,24 @@ func (s *BadResponsesScorer) countNoLock(pid peer.ID) (int, error) {
// Increment increments the number of bad responses we have received from the given remote peer.
// If peer doesn't exist this method is no-op.
func (s *BadResponsesScorer) Increment(pid peer.ID) {
func (s *BadResponsesScorer) Increment(pid peer.ID) int {
if pid == "" {
return
return 0
}
s.store.Lock()
defer s.store.Unlock()
peerData, ok := s.store.PeerData(pid)
if !ok {
s.store.SetPeerData(pid, &peerdata.PeerData{
BadResponses: 1,
})
return
if ok {
peerData.BadResponses++
return peerData.BadResponses
}
peerData.BadResponses++
const badResponses = 1
peerData = &peerdata.PeerData{BadResponses: badResponses}
s.store.SetPeerData(pid, peerData)
return badResponses
}
// IsBadPeer states if the peer is to be considered bad.

View File

@@ -31,6 +31,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@@ -86,6 +87,7 @@ type Service struct {
genesisTime time.Time
genesisValidatorsRoot []byte
activeValidatorCount uint64
peerDisconnectionTime *cache.Cache
}
// NewService initializes a new p2p service compatible with shared.Service interface. No
@@ -115,16 +117,17 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
ipLimiter := leakybucket.NewCollector(ipLimit, ipBurst, 30*time.Second, true /* deleteEmptyBuckets */)
s := &Service{
ctx: ctx,
cancel: cancel,
cfg: cfg,
addrFilter: addrFilter,
ipLimiter: ipLimiter,
privKey: privKey,
metaData: metaData,
isPreGenesis: true,
joinedTopics: make(map[string]*pubsub.Topic, len(gossipTopicMappings)),
subnetsLock: make(map[uint64]*sync.RWMutex),
ctx: ctx,
cancel: cancel,
cfg: cfg,
addrFilter: addrFilter,
ipLimiter: ipLimiter,
privKey: privKey,
metaData: metaData,
isPreGenesis: true,
joinedTopics: make(map[string]*pubsub.Topic, len(gossipTopicMappings)),
subnetsLock: make(map[uint64]*sync.RWMutex),
peerDisconnectionTime: cache.New(1*time.Second, 1*time.Minute),
}
ipAddr := prysmnetwork.IPAddr()
@@ -486,14 +489,17 @@ func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error
if info.ID == s.host.ID() {
return nil
}
if err := s.Peers().IsBad(info.ID); err != nil {
return errors.Wrap(err, "refused to connect to bad peer")
return errors.Wrap(err, "bad peer")
}
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
defer cancel()
if err := s.host.Connect(ctx, info); err != nil {
s.Peers().Scorers().BadResponsesScorer().Increment(info.ID)
return err
s.downscorePeer(info.ID, "connectionError")
return errors.Wrap(err, "peer connect")
}
return nil
}
@@ -524,3 +530,8 @@ func (s *Service) connectToBootnodes() error {
func (s *Service) isInitialized() bool {
return !s.genesisTime.IsZero() && len(s.genesisValidatorsRoot) == 32
}
func (s *Service) downscorePeer(peerID peer.ID, reason string) {
newScore := s.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}

View File

@@ -403,7 +403,7 @@ func TestService_connectWithPeer(t *testing.T) {
return ps
}(),
info: peer.AddrInfo{ID: "bad"},
wantErr: "refused to connect to bad peer",
wantErr: "bad peer",
},
}
for _, tt := range tests {

View File

@@ -181,6 +181,11 @@ func (s *Service) findPeersWithSubnets(
// Get all needed subnets that the node is subscribed to.
// Skip nodes that are not subscribed to any of the defective subnets.
node := iterator.Node()
if !s.filterPeer(node) {
continue
}
nodeSubnets, err := filter(node)
if err != nil {
return nil, errors.Wrap(err, "filter node")

View File

@@ -1029,8 +1029,8 @@ func (s *Server) GetProposerDuties(w http.ResponseWriter, r *http.Request) {
httputil.HandleError(w, fmt.Sprintf("Could not get head state: %v ", err), http.StatusInternalServerError)
return
}
// Advance state with empty transitions up to the requested epoch start slot.
if st.Slot() < epochStartSlot {
// Advance state with empty transitions up to the requested epoch start slot for pre fulu state only. Fulu state utilizes proposer look ahead field.
if st.Slot() < epochStartSlot && st.Version() != version.Fulu {
headRoot, err := s.HeadFetcher.HeadRoot(ctx)
if err != nil {
httputil.HandleError(w, fmt.Sprintf("Could not get head root: %v ", err), http.StatusInternalServerError)

View File

@@ -2645,6 +2645,78 @@ func TestGetProposerDuties(t *testing.T) {
})
}
func TestGetProposerDuties_FuluState(t *testing.T) {
helpers.ClearCache()
// Create a Fulu state with slot 0 (before epoch 1 start slot which is 32)
fuluState, err := util.NewBeaconStateFulu()
require.NoError(t, err)
require.NoError(t, fuluState.SetSlot(0)) // Set to slot 0
// Create some validators for the test
depChainStart := params.BeaconConfig().MinGenesisActiveValidatorCount
deposits, _, err := util.DeterministicDepositsAndKeys(depChainStart)
require.NoError(t, err)
validators := make([]*ethpbalpha.Validator, len(deposits))
for i, deposit := range deposits {
validators[i] = &ethpbalpha.Validator{
PublicKey: deposit.Data.PublicKey,
ActivationEpoch: 0,
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
WithdrawalCredentials: make([]byte, 32),
}
}
require.NoError(t, fuluState.SetValidators(validators))
// Set up block roots
genesis := util.NewBeaconBlock()
genesisRoot, err := genesis.Block.HashTreeRoot()
require.NoError(t, err)
roots := make([][]byte, fieldparams.BlockRootsLength)
roots[0] = genesisRoot[:]
require.NoError(t, fuluState.SetBlockRoots(roots))
chainSlot := primitives.Slot(0)
chain := &mockChain.ChainService{
State: fuluState, Root: genesisRoot[:], Slot: &chainSlot,
}
db := dbutil.SetupDB(t)
require.NoError(t, db.SaveGenesisBlockRoot(t.Context(), genesisRoot))
s := &Server{
Stater: &testutil.MockStater{StatesBySlot: map[primitives.Slot]state.BeaconState{0: fuluState}},
HeadFetcher: chain,
TimeFetcher: chain,
OptimisticModeFetcher: chain,
SyncChecker: &mockSync.Sync{IsSyncing: false},
PayloadIDCache: cache.NewPayloadIDCache(),
TrackedValidatorsCache: cache.NewTrackedValidatorsCache(),
BeaconDB: db,
}
// Request epoch 1 duties, which should require advancing from slot 0 to slot 32
// But for Fulu state, this advancement should be skipped
request := httptest.NewRequest(http.MethodGet, "http://www.example.com/eth/v1/validator/duties/proposer/{epoch}", nil)
request.SetPathValue("epoch", "1")
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.GetProposerDuties(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
// Verify the state was not advanced - it should still be at slot 0
// This is the key assertion for the regression test
assert.Equal(t, primitives.Slot(0), fuluState.Slot(), "Fulu state should not have been advanced")
resp := &structs.GetProposerDutiesResponse{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp))
// Should still return proposer duties despite not advancing the state
assert.Equal(t, true, len(resp.Data) > 0, "Should return proposer duties even without state advancement")
}
func TestGetSyncCommitteeDuties(t *testing.T) {
helpers.ClearCache()
params.SetupTestConfigCleanup(t)

View File

@@ -315,7 +315,7 @@ func (bs *Server) ListIndexedAttestationsElectra(
// that it was included in a block. The attestation may have expired.
// Refer to the ethereum consensus specification for more details on how
// attestations are processed and when they are no longer valid.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#attestations
func (bs *Server) AttestationPool(_ context.Context, req *ethpb.AttestationPoolRequest) (*ethpb.AttestationPoolResponse, error) {
var atts []*ethpb.Attestation
var err error

View File

@@ -262,7 +262,7 @@ func (vs *Server) activationStatus(
// It cannot faithfully attest to the head block of the chain, since it has not fully verified that block.
//
// Spec:
// https://github.com/ethereum/consensus-specs/blob/dev/sync/optimistic.md
// https://github.com/ethereum/consensus-specs/blob/master/sync/optimistic.md
func (vs *Server) optimisticStatus(ctx context.Context) error {
if slots.ToEpoch(vs.TimeFetcher.CurrentSlot()) < params.BeaconConfig().BellatrixForkEpoch {
return nil

View File

@@ -17,6 +17,7 @@ import (
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type Service struct {
@@ -211,7 +212,7 @@ func (s *Service) importBatches(ctx context.Context) {
_, err := s.batchImporter(ctx, current, ib, s.store)
if err != nil {
log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import")
s.downscore(ib)
s.downscorePeer(ib.blockPid, "backfillBatchImportError")
s.batchSeq.update(ib.withState(batchErrRetryable))
// If a batch fails, the subsequent batches are no longer considered importable.
break
@@ -336,10 +337,6 @@ func (s *Service) initBatches() error {
return nil
}
func (s *Service) downscore(b batch) {
s.p2p.Peers().Scorers().BadResponsesScorer().Increment(b.blockPid)
}
func (*Service) Stop() error {
return nil
}
@@ -383,3 +380,8 @@ func (s *Service) WaitForCompletion() error {
return nil
}
}
func (s *Service) downscorePeer(peerID peer.ID, reason string) {
newScore := s.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}

View File

@@ -337,14 +337,15 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn {
}
}
}
if errors.Is(response.err, beaconsync.ErrInvalidFetchedData) {
// Peer returned invalid data, penalize.
q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(response.blocksFrom)
log.WithField("pid", response.blocksFrom).Debug("Peer is penalized for invalid blocks")
} else if errors.Is(response.err, verification.ErrBlobInvalid) {
q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(response.blobsFrom)
log.WithField("pid", response.blobsFrom).Debug("Peer is penalized for invalid blob response")
q.downscorePeer(response.blocksFrom, "invalidBlocks")
}
if errors.Is(response.err, verification.ErrBlobInvalid) {
q.downscorePeer(response.blobsFrom, "invalidBlobs")
}
return m.state, response.err
}
m.fetched = *response
@@ -455,6 +456,11 @@ func (q *blocksQueue) onProcessSkippedEvent(ctx context.Context) eventHandlerFn
}
}
func (q *blocksQueue) downscorePeer(peerID peer.ID, reason string) {
newScore := q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}
// onCheckStaleEvent is an event that allows to mark stale epochs,
// so that they can be re-processed.
func onCheckStaleEvent(ctx context.Context) eventHandlerFn {

View File

@@ -522,7 +522,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
})
assert.ErrorContains(t, beaconsync.ErrInvalidFetchedData.Error(), err)
assert.Equal(t, stateScheduled, updatedState)
assert.LogsContain(t, hook, "msg=\"Peer is penalized for invalid blocks\" pid=ZiCa")
assert.LogsContain(t, hook, "Downscore peer")
})
t.Run("transition ok", func(t *testing.T) {

View File

@@ -14,6 +14,7 @@ import (
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -345,12 +346,11 @@ func isPunishableError(err error) bool {
func (s *Service) updatePeerScorerStats(data *blocksQueueFetchedData, count uint64, err error) {
if isPunishableError(err) {
if verification.IsBlobValidationFailure(err) {
log.WithError(err).WithField("peer_id", data.blobsFrom).Warn("Downscoring peer for invalid blobs")
s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(data.blobsFrom)
s.downscorePeer(data.blobsFrom, "invalidBlobs")
} else {
log.WithError(err).WithField("peer_id", data.blocksFrom).Warn("Downscoring peer for invalid blocks")
s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(data.blocksFrom)
s.downscorePeer(data.blocksFrom, "invalidBlocks")
}
// If the error is punishable, exit here so that we don't give them credit for providing bad blocks.
return
}
@@ -376,3 +376,8 @@ func (s *Service) isProcessedBlock(ctx context.Context, blk blocks.ROBlock) bool
}
return false
}
func (s *Service) downscorePeer(peerID peer.ID, reason string) {
newScore := s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
@@ -122,7 +123,7 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
collector, err := l.retrieveCollector(topic)
if err != nil {
return err
return errors.Wrap(err, "retrieve collector")
}
remaining := collector.Remaining(remotePeer.String())
@@ -131,7 +132,7 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
amt = 1
}
if amt > uint64(remaining) {
l.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
l.downscorePeer(remotePeer, topic, "rateLimitExceeded")
writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p)
return p2ptypes.ErrRateLimited
}
@@ -139,22 +140,20 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
}
// This is used to validate all incoming rpc streams from external peers.
func (l *limiter) validateRawRpcRequest(stream network.Stream) error {
func (l *limiter) validateRawRpcRequest(stream network.Stream, amt uint64) error {
l.RLock()
defer l.RUnlock()
topic := rpcLimiterTopic
collector, err := l.retrieveCollector(topic)
remotePeer := stream.Conn().RemotePeer()
collector, err := l.retrieveCollector(rpcLimiterTopic)
if err != nil {
return err
}
key := stream.Conn().RemotePeer().String()
remaining := collector.Remaining(key)
// Treat each request as a minimum of 1.
amt := int64(1)
if amt > remaining {
l.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
if amt > uint64(remaining) {
l.downscorePeer(remotePeer, rpcLimiterTopic, "rawRateLimitExceeded")
writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p)
return p2ptypes.ErrRateLimited
}
@@ -233,3 +232,13 @@ func (l *limiter) retrieveCollector(topic string) (*leakybucket.Collector, error
func (_ *limiter) topicLogger(topic string) *logrus.Entry {
return log.WithField("rateLimiter", topic)
}
func (l *limiter) downscorePeer(peerID peer.ID, topic, reason string) {
newScore := l.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{
"peerID": peerID.String(),
"reason": reason,
"newScore": newScore,
"topic": topic,
}).Debug("Downscore peer")
}

View File

@@ -85,16 +85,16 @@ func TestRateLimiter_ExceedRawCapacity(t *testing.T) {
require.NoError(t, err, "could not create stream")
for i := 0; i < 2*defaultBurstLimit; i++ {
err = rlimiter.validateRawRpcRequest(stream)
err = rlimiter.validateRawRpcRequest(stream, 1)
rlimiter.addRawStream(stream)
require.NoError(t, err, "could not validate incoming request")
}
// Triggers rate limit error on burst.
assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream))
assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream, 1))
// Make Peer bad.
for i := 0; i < defaultBurstLimit; i++ {
assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream))
assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream, 1))
}
assert.NotNil(t, p1.Peers().IsBad(p2.PeerID()), "peer is not marked as a bad peer")
require.NoError(t, stream.Close(), "could not close stream")

View File

@@ -20,6 +20,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/sirupsen/logrus"
)
var (
@@ -39,7 +40,7 @@ type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error
// rpcHandlerByTopicFromFork returns the RPC handlers for a given fork index.
func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandler, error) {
// Fulu: https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#messages
// Fulu: https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#messages
if forkIndex >= version.Fulu {
return map[string]rpcHandler{
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
@@ -54,7 +55,7 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
}, nil
}
// Electra: https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/p2p-interface.md#messages
// Electra: https://github.com/ethereum/consensus-specs/blob/master/specs/electra/p2p-interface.md#messages
if forkIndex >= version.Electra {
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
@@ -68,7 +69,7 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
}, nil
}
// Deneb: https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#messages
// Deneb: https://github.com/ethereum/consensus-specs/blob/master/specs/deneb/p2p-interface.md#messages
if forkIndex >= version.Deneb {
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
@@ -82,9 +83,9 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
}, nil
}
// Capella: https://github.com/ethereum/consensus-specs/blob/dev/specs/capella/p2p-interface.md#messages
// Bellatrix: https://github.com/ethereum/consensus-specs/blob/dev/specs/bellatrix/p2p-interface.md#messages
// Altair: https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/p2p-interface.md#messages
// Capella: https://github.com/ethereum/consensus-specs/blob/master/specs/capella/p2p-interface.md#messages
// Bellatrix: https://github.com/ethereum/consensus-specs/blob/master/specs/bellatrix/p2p-interface.md#messages
// Altair: https://github.com/ethereum/consensus-specs/blob/master/specs/altair/p2p-interface.md#messages
if forkIndex >= version.Altair {
handler := map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
@@ -105,7 +106,7 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
return handler, nil
}
// PhaseO: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#messages
// PhaseO: https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#messages
if forkIndex >= version.Phase0 {
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
@@ -238,7 +239,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
defer span.End()
span.SetAttributes(trace.StringAttribute("topic", topic))
span.SetAttributes(trace.StringAttribute("peer", remotePeer.String()))
log := log.WithField("peer", stream.Conn().RemotePeer().String()).WithField("topic", string(stream.Protocol()))
log := log.WithFields(logrus.Fields{"peer": remotePeer.String(), "topic": string(stream.Protocol())})
// Check before hand that peer is valid.
if err := s.cfg.p2p.Peers().IsBad(remotePeer); err != nil {
@@ -248,7 +249,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
return
}
// Validate request according to peer limits.
if err := s.rateLimiter.validateRawRpcRequest(stream); err != nil {
if err := s.rateLimiter.validateRawRpcRequest(stream, 1); err != nil {
log.WithError(err).Debug("Could not validate rpc request from peer")
return
}
@@ -304,7 +305,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
logStreamErrors(err, topic)
tracing.AnnotateError(span, err)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(remotePeer, "registerRpcError")
return
}
if err := handle(ctx, msg, stream); err != nil {
@@ -324,7 +325,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
logStreamErrors(err, topic)
tracing.AnnotateError(span, err)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(remotePeer, "registerRpcError")
return
}
if err := handle(ctx, nTyp.Elem().Interface(), stream); err != nil {

View File

@@ -15,6 +15,7 @@ import (
pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/time/slots"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@@ -43,7 +44,7 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
rp, err := validateRangeRequest(m, s.cfg.clock.CurrentSlot())
if err != nil {
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
s.downscorePeer(remotePeer, "beaconBlocksByRangeRPCHandlerValidationError")
tracing.AnnotateError(span, err)
return err
}
@@ -201,3 +202,13 @@ func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch,
return nil
}
func (s *Service) downscorePeer(peerID peer.ID, reason string, fields ...logrus.Fields) {
log := log
for _, field := range fields {
log = log.WithFields(field)
}
newScore := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}

View File

@@ -92,9 +92,11 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return errors.New("no block roots provided")
}
remotePeer := stream.Conn().RemotePeer()
currentEpoch := slots.ToEpoch(s.cfg.clock.CurrentSlot())
if uint64(len(blockRoots)) > params.MaxRequestBlock(currentEpoch) {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(remotePeer, "beaconBlocksRootRPCHandlerTooManyRoots")
s.writeErrorResponseToStream(responseCodeInvalidRequest, "requested more than the max block limit", stream)
return errors.New("requested more than the max block limit")
}

View File

@@ -74,10 +74,13 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
return err
}
remotePeer := stream.Conn().RemotePeer()
rp, err := validateBlobsByRange(r, s.cfg.chain.CurrentSlot())
if err != nil {
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(remotePeer, "blobSidecarsByRangeRpcHandlerValidationError")
tracing.AnnotateError(span, err)
return err
}
@@ -87,7 +90,7 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
defer ticker.Stop()
batcher, err := newBlockRangeBatcher(rp, s.cfg.beaconDB, s.rateLimiter, s.cfg.chain.IsCanonical, ticker)
if err != nil {
log.WithError(err).Info("error in BlobSidecarsByRange batch")
log.WithError(err).Error("Cannot create new block range batcher")
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, err)
return err
@@ -112,7 +115,7 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
}
}
if err := batch.error(); err != nil {
log.WithError(err).Debug("error in BlobSidecarsByRange batch")
log.WithError(err).Debug("Error in BlobSidecarsByRange batch")
// If a rate limit is hit, it means an error response has already been sent and the stream has been closed.
if !errors.Is(err, p2ptypes.ErrRateLimited) {

View File

@@ -39,7 +39,7 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
cs := s.cfg.clock.CurrentSlot()
remotePeer := stream.Conn().RemotePeer()
if err := validateBlobByRootRequest(blobIdents, cs); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
s.downscorePeer(remotePeer, "blobSidecarsByRootRpcHandlerValidationError")
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
return err
}

View File

@@ -65,7 +65,7 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i
rangeParameters, err := validateDataColumnsByRange(request, s.cfg.chain.CurrentSlot())
if err != nil {
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
s.downscorePeer(remotePeer, "dataColumnSidecarsByRangeRpcHandlerValidationError")
tracing.AnnotateError(span, err)
return errors.Wrap(err, "validate data columns by range")
}

View File

@@ -27,7 +27,7 @@ var (
)
// dataColumnSidecarByRootRPCHandler handles the data column sidecars by root RPC request.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.dataColumnSidecarByRootRPCHandler")
defer span.End()
@@ -42,7 +42,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
}
requestedColumnIdents := *ref
remotePeerId := stream.Conn().RemotePeer()
remotePeer := stream.Conn().RemotePeer()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
@@ -51,7 +51,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
// Penalize peers that send invalid requests.
if err := validateDataColumnsByRootRequest(requestedColumnIdents); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeerId)
s.downscorePeer(remotePeer, "dataColumnSidecarByRootRPCHandlerValidationError")
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
return errors.Wrap(err, "validate data columns by root request")
}
@@ -85,7 +85,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
}
log := log.WithFields(logrus.Fields{
"peer": remotePeerId,
"peer": remotePeer,
"columns": requestedColumnsByRootLog,
})

View File

@@ -13,6 +13,7 @@ import (
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@@ -35,22 +36,40 @@ var backOffTime = map[primitives.SSZUint64]time.Duration{
// goodbyeRPCHandler reads the incoming goodbye rpc message from the peer.
func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream libp2pcore.Stream) error {
const amount = 1
SetRPCStreamDeadlines(stream)
peerID := stream.Conn().RemotePeer()
m, ok := msg.(*primitives.SSZUint64)
if !ok {
return fmt.Errorf("wrong message type for goodbye, got %T, wanted *uint64", msg)
}
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
log.WithError(err).Debug("Goodbye message from rate-limited peer")
} else {
isRateLimitedPeer := false
if err := s.rateLimiter.validateRequest(stream, amount); err != nil {
if !errors.Is(err, p2ptypes.ErrRateLimited) {
return errors.Wrap(err, "validate request")
}
isRateLimitedPeer = true
}
if !isRateLimitedPeer {
s.rateLimiter.add(stream, 1)
}
log := log.WithField("Reason", goodbyeMessage(*m))
log.WithField("peer", stream.Conn().RemotePeer()).Trace("Peer has sent a goodbye message")
s.cfg.p2p.Peers().SetNextValidTime(stream.Conn().RemotePeer(), goodByeBackoff(*m))
// closes all streams with the peer
return s.cfg.p2p.Disconnect(stream.Conn().RemotePeer())
log.WithFields(logrus.Fields{
"peer": peerID,
"reason": goodbyeMessage(*m),
"isRateLimited": isRateLimitedPeer,
}).Debug("Received a goodbye message")
s.cfg.p2p.Peers().SetNextValidTime(peerID, goodByeBackoff(*m))
if err := s.cfg.p2p.Disconnect(peerID); err != nil {
return errors.Wrap(err, "disconnect")
}
return nil
}
// disconnectBadPeer checks whether peer is considered bad by some scorer, and tries to disconnect
@@ -70,7 +89,7 @@ func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID, badPeerErr
"peerID": id,
"agent": agentString(id, s.cfg.p2p.Host()),
}).
Debug("Sent peer disconnection")
Debug("Sent bad peer disconnection")
}
// A custom goodbye method that is used by our connection handler, in the

View File

@@ -26,7 +26,7 @@ func (s *Service) lightClientBootstrapRPCHandler(ctx context.Context, msg interf
SetRPCStreamDeadlines(stream)
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
logger.WithError(err).Error("s.rateLimiter.validateRequest")
logger.WithError(err).Error("Cannot validate request")
return err
}
s.rateLimiter.add(stream, 1)
@@ -42,7 +42,7 @@ func (s *Service) lightClientBootstrapRPCHandler(ctx context.Context, msg interf
if err != nil {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, err)
logger.WithError(err).Error("s.cfg.beaconDB.LightClientBootstrap")
logger.WithError(err).Error("Cannot bootstrap light client")
return err
}
if bootstrap == nil {
@@ -74,10 +74,11 @@ func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg i
defer cancel()
logger := log.WithField("handler", p2p.LightClientUpdatesByRangeName[1:])
remotePeer := stream.Conn().RemotePeer()
SetRPCStreamDeadlines(stream)
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
logger.WithError(err).Error("s.rateLimiter.validateRequest")
logger.WithError(err).Error("Cannot validate request")
return err
}
s.rateLimiter.add(stream, 1)
@@ -90,7 +91,8 @@ func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg i
if r.Count == 0 {
s.writeErrorResponseToStream(responseCodeInvalidRequest, "count is 0", stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(remotePeer, "lightClientUpdatesByRangeRPCHandlerCount0")
logger.Error("Count is 0")
return nil
}
@@ -102,7 +104,7 @@ func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg i
endPeriod, err := math.Add64(r.StartPeriod, r.Count-1)
if err != nil {
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.downscorePeer(remotePeer, "lightClientUpdatesByRangeRPCHandlerEndPeriodOverflow")
tracing.AnnotateError(span, err)
logger.WithError(err).Error("End period overflows")
return err
@@ -114,7 +116,7 @@ func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg i
if err != nil {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, err)
logger.WithError(err).Error("s.cfg.beaconDB.LightClientUpdates")
logger.WithError(err).Error("Cannot retrieve light client updates")
return err
}
@@ -153,7 +155,7 @@ func (s *Service) lightClientFinalityUpdateRPCHandler(ctx context.Context, _ int
SetRPCStreamDeadlines(stream)
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
logger.WithError(err).Error("s.rateLimiter.validateRequest")
logger.WithError(err).Error("Cannot validate request")
return err
}
s.rateLimiter.add(stream, 1)
@@ -191,7 +193,7 @@ func (s *Service) lightClientOptimisticUpdateRPCHandler(ctx context.Context, _ i
SetRPCStreamDeadlines(stream)
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
logger.WithError(err).Error("s.rateLimiter.validateRequest")
logger.WithError(err).Error("Cannot validate request")
return err
}
s.rateLimiter.add(stream, 1)

View File

@@ -172,12 +172,12 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta
// Read the METADATA response from the peer.
code, errMsg, err := ReadStatusCode(stream, s.cfg.p2p.Encoding())
if err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
s.downscorePeer(peerID, "MetadataReadStatusCodeError")
return nil, errors.Wrap(err, "read status code")
}
if code != 0 {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
s.downscorePeer(peerID, "NonNullMetadataReadStatusCode")
return nil, errors.New(errMsg)
}
@@ -214,8 +214,8 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta
// Decode the metadata from the peer.
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return nil, err
s.downscorePeer(peerID, "MetadataDecodeError")
return nil, errors.Wrap(err, "decode with max length")
}
return msg, nil

View File

@@ -13,6 +13,7 @@ import (
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// pingHandler reads the incoming ping rpc message from the peer.
@@ -27,6 +28,7 @@ func (s *Service) pingHandler(_ context.Context, msg interface{}, stream libp2pc
if !ok {
return fmt.Errorf("wrong message type for ping, got %T, wanted *uint64", msg)
}
sequenceNumber := uint64(*m)
// Validate the incoming request regarding rate limiting.
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
@@ -39,14 +41,9 @@ func (s *Service) pingHandler(_ context.Context, msg interface{}, stream libp2pc
peerID := stream.Conn().RemotePeer()
// Check if the peer sequence number is higher than the one we have in our store.
valid, err := s.validateSequenceNum(*m, peerID)
valid, err := s.isSequenceNumberUpToDate(sequenceNumber, peerID)
if err != nil {
// Descore peer for giving us a bad sequence number.
if errors.Is(err, p2ptypes.ErrInvalidSequenceNum) {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidSequenceNum.Error(), stream)
}
s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidSequenceNum.Error(), stream)
return errors.Wrap(err, "validate sequence number")
}
@@ -141,7 +138,7 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
// If the peer responded with an error, increment the bad responses scorer.
if code != 0 {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
s.downscorePeer(peerID, "NotNullPingReadStatusCode")
return errors.Errorf("code: %d - %s", code, errMsg)
}
@@ -150,15 +147,11 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
return errors.Wrap(err, "decode sequence number")
}
sequenceNumber := uint64(*msg)
// Determine if the peer's sequence number returned by the peer is higher than the one we have in our store.
valid, err := s.validateSequenceNum(*msg, peerID)
valid, err := s.isSequenceNumberUpToDate(sequenceNumber, peerID)
if err != nil {
// Descore peer for giving us a bad sequence number.
if errors.Is(err, p2ptypes.ErrInvalidSequenceNum) {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
}
return errors.Wrap(err, "validate sequence number")
}
@@ -180,27 +173,36 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
return nil
}
// validateSequenceNum validates the peer's sequence number.
// - If the peer's sequence number is greater than the sequence number we have in our store for the peer, return false.
// - If the peer's sequence number is equal to the sequence number we have in our store for the peer, return true.
// - If the peer's sequence number is less than the sequence number we have in our store for the peer, return an error.
func (s *Service) validateSequenceNum(seq primitives.SSZUint64, id peer.ID) (bool, error) {
// isSequenceNumberUpToDate check if our internal sequence number for the peer is up to date wrt. the incoming one.
// - If the incoming sequence number is greater than the sequence number we have in our store for the peer, return false.
// - If the incoming sequence number is equal to the sequence number we have in our store for the peer, return true.
// - If the incoming sequence number is less than the sequence number we have in our store for the peer, return an error.
func (s *Service) isSequenceNumberUpToDate(incomingSequenceNumber uint64, peerID peer.ID) (bool, error) {
// Retrieve the metadata for the peer we got in our store.
md, err := s.cfg.p2p.Peers().Metadata(id)
storedMetadata, err := s.cfg.p2p.Peers().Metadata(peerID)
if err != nil {
return false, errors.Wrap(err, "get metadata")
return false, errors.Wrap(err, "peers metadata")
}
// If we have no metadata for the peer, return false.
if md == nil || md.IsNil() {
if storedMetadata == nil || storedMetadata.IsNil() {
return false, nil
}
// The peer's sequence number must be less than or equal to the sequence number we have in our store.
if md.SequenceNumber() > uint64(seq) {
storedSequenceNumber := storedMetadata.SequenceNumber()
if storedSequenceNumber > incomingSequenceNumber {
s.downscorePeer(peerID, "pingInvalidSequenceNumber", logrus.Fields{
"storedSequenceNumber": storedSequenceNumber,
"incomingSequenceNumber": incomingSequenceNumber,
})
return false, p2ptypes.ErrInvalidSequenceNum
}
// Return true if the peer's sequence number is equal to the sequence number we have in our store.
return md.SequenceNumber() == uint64(seq), nil
// If this is the case, our information about the peer is outdated.
if storedSequenceNumber < incomingSequenceNumber {
return false, nil
}
return true, nil
}

View File

@@ -35,6 +35,9 @@ func (s *Service) maintainPeerStatuses() {
wg.Add(1)
go func(id peer.ID) {
defer wg.Done()
log := log.WithField("peer", id)
// If our peer status has not been updated correctly we disconnect over here
// and set the connection state over here instead.
if s.cfg.p2p.Host().Network().Connectedness(id) != network.Connected {
@@ -43,27 +46,26 @@ func (s *Service) maintainPeerStatuses() {
log.WithError(err).Debug("Error when disconnecting with peer")
}
s.cfg.p2p.Peers().SetConnectionState(id, peers.Disconnected)
log.WithFields(logrus.Fields{
"peer": id,
"reason": "maintain peer statuses - peer is not connected",
}).Debug("Initiate peer disconnection")
log.WithField("reason", "maintainPeerStatusesNotConnectedPeer").Debug("Initiate peer disconnection")
return
}
// Disconnect from peers that are considered bad by any of the registered scorers.
if err := s.cfg.p2p.Peers().IsBad(id); err != nil {
s.disconnectBadPeer(s.ctx, id, err)
return
}
// If the status hasn't been updated in the recent interval time.
lastUpdated, err := s.cfg.p2p.Peers().ChainStateLastUpdated(id)
if err != nil {
// Peer has vanished; nothing to do.
return
}
if prysmTime.Now().After(lastUpdated.Add(interval)) {
if err := s.reValidatePeer(s.ctx, id); err != nil {
log.WithField("peer", id).WithError(err).Debug("Could not revalidate peer")
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(id)
log.WithError(err).Debug("Cannot re-validate peer")
}
}
}(pid)
@@ -128,19 +130,20 @@ func (s *Service) shouldReSync() bool {
}
// sendRPCStatusRequest for a given topic with an expected protobuf message type.
func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
func (s *Service) sendRPCStatusRequest(ctx context.Context, peer peer.ID) error {
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
headRoot, err := s.cfg.chain.HeadRoot(ctx)
if err != nil {
return err
return errors.Wrap(err, "head root")
}
forkDigest, err := s.currentForkDigest()
if err != nil {
return err
return errors.Wrap(err, "current fork digest")
}
cp := s.cfg.chain.FinalizedCheckpt()
resp := &pb.Status{
ForkDigest: forkDigest[:],
@@ -149,37 +152,42 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
HeadRoot: headRoot,
HeadSlot: s.cfg.chain.HeadSlot(),
}
log := log.WithField("peer", peer)
topic, err := p2p.TopicFromMessage(p2p.StatusMessageName, slots.ToEpoch(s.cfg.clock.CurrentSlot()))
if err != nil {
return err
return errors.Wrap(err, "topic from message")
}
stream, err := s.cfg.p2p.Send(ctx, resp, topic, id)
stream, err := s.cfg.p2p.Send(ctx, resp, topic, peer)
if err != nil {
return err
return errors.Wrap(err, "send p2p message")
}
defer closeStream(stream, log)
code, errMsg, err := ReadStatusCode(stream, s.cfg.p2p.Encoding())
if err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return err
s.downscorePeer(peer, "statusRequestReadStatusCodeError")
return errors.Wrap(err, "read status code")
}
if code != 0 {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(id)
s.downscorePeer(peer, "statusRequestNonNullStatusCode")
return errors.New(errMsg)
}
msg := &pb.Status{}
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return err
s.downscorePeer(peer, "statusRequestDecodeError")
return errors.Wrap(err, "decode status message")
}
// If validation fails, validation error is logged, and peer status scorer will mark peer as bad.
err = s.validateStatusMessage(ctx, msg)
s.cfg.p2p.Peers().Scorers().PeerStatusScorer().SetPeerStatus(id, msg, err)
if err := s.cfg.p2p.Peers().IsBad(id); err != nil {
s.disconnectBadPeer(s.ctx, id, err)
s.cfg.p2p.Peers().Scorers().PeerStatusScorer().SetPeerStatus(peer, msg, err)
if err := s.cfg.p2p.Peers().IsBad(peer); err != nil {
s.disconnectBadPeer(s.ctx, peer, err)
}
return err
}
@@ -238,7 +246,7 @@ func (s *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
return nil
default:
respCode = responseCodeInvalidRequest
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
s.downscorePeer(remotePeer, "statusRpcHandlerInvalidMessage")
}
originalErr := err

View File

@@ -11,10 +11,12 @@ import (
lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/crypto/rand"
lru "github.com/hashicorp/golang-lru"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
gcache "github.com/patrickmn/go-cache"
"github.com/pkg/errors"
@@ -278,19 +280,32 @@ func (s *Service) Start() {
// Stop the regular sync service.
func (s *Service) Stop() error {
defer func() {
s.cancel()
if s.rateLimiter != nil {
s.rateLimiter.free()
}
}()
// Say goodbye to all peers.
for _, peerID := range s.cfg.p2p.Peers().Connected() {
if s.cfg.p2p.Host().Network().Connectedness(peerID) == network.Connected {
if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeClientShutdown, peerID); err != nil {
log.WithError(err).WithField("peerID", peerID).Error("Failed to send goodbye message")
}
}
}
// Removing RPC Stream handlers.
for _, p := range s.cfg.p2p.Host().Mux().Protocols() {
s.cfg.p2p.Host().RemoveStreamHandler(p)
}
// Deregister Topic Subscribers.
for _, t := range s.cfg.p2p.PubSub().GetTopics() {
s.unSubscribeFromTopic(t)
}
defer s.cancel()
return nil
}

View File

@@ -25,7 +25,7 @@ import (
"github.com/sirupsen/logrus"
)
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
const dataColumnSidecarSubTopic = "/data_column_sidecar_%d/"
@@ -74,7 +74,7 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
verifier := s.newColumnsVerifier(roDataColumns, verification.GossipDataColumnSidecarRequirements)
// Start the verification process.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
// [REJECT] The sidecar is valid as verified by `verify_data_column_sidecar(sidecar)`.
if err := verifier.ValidFields(); err != nil {

View File

@@ -22,7 +22,7 @@ import (
var (
// GossipDataColumnSidecarRequirements defines the set of requirements that DataColumnSidecars received on gossip
// must satisfy in order to upgrade an RODataColumn to a VerifiedRODataColumn.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#data_column_sidecar_subnet_id
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#data_column_sidecar_subnet_id
GossipDataColumnSidecarRequirements = []Requirement{
RequireValidFields,
RequireCorrectSubnet,
@@ -40,7 +40,7 @@ var (
// ByRangeRequestDataColumnSidecarRequirements defines the set of requirements that DataColumnSidecars received
// via the by range request must satisfy in order to upgrade an RODataColumn to a VerifiedRODataColumn.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
ByRangeRequestDataColumnSidecarRequirements = []Requirement{
RequireValidFields,
RequireSidecarInclusionProven,

View File

@@ -0,0 +1,7 @@
### Removed
- Partially reverting pr #15390 removing the `ssz-only` debug flag until there is a real usecase for the flag
### Added
- Added new PRYSM_API_OVERRIDE_ACCEPT environment variable to override ssz accept header as a replacement to flag

View File

@@ -0,0 +1,3 @@
### Changed
- Update links to consensus-specs to point to `master` branch

View File

@@ -0,0 +1,2 @@
### Fixed
- Fixed various reasons why a node is banned by its peers when it stops.

3
changelog/tt_duty.md Normal file
View File

@@ -0,0 +1,3 @@
### Fixed
- Beacon-api proposer duty fulu computation

View File

@@ -51,7 +51,6 @@ type Flags struct {
EnableExperimentalAttestationPool bool // EnableExperimentalAttestationPool enables an experimental attestation pool design.
DisableDutiesV2 bool // DisableDutiesV2 sets validator client to use the get Duties endpoint
EnableWeb bool // EnableWeb enables the webui on the validator client
SSZOnly bool // SSZOnly forces the validator client to use SSZ for communication with the beacon node when REST mode is enabled (useful for debugging)
// Logging related toggles.
DisableGRPCConnectionLogs bool // Disables logging when a new grpc client has connected.
EnableFullSSZDataLogging bool // Enables logging for full ssz data on rejected gossip messages
@@ -340,10 +339,6 @@ func ConfigureValidator(ctx *cli.Context) error {
logEnabled(EnableWebFlag)
cfg.EnableWeb = true
}
if ctx.Bool(SSZOnly.Name) {
logEnabled(SSZOnly)
cfg.SSZOnly = true
}
cfg.KeystoreImportDebounceInterval = ctx.Duration(dynamicKeyReloadDebounceInterval.Name)
Init(cfg)

View File

@@ -197,12 +197,6 @@ var (
Usage: "(Work in progress): Enables the web portal for the validator client.",
Value: false,
}
// SSZOnly forces the validator client to use SSZ for communication with the beacon node when REST mode is enabled
SSZOnly = &cli.BoolFlag{
Name: "ssz-only",
Usage: "(debug): Forces the validator client to use SSZ for communication with the beacon node when REST mode is enabled",
}
)
// devModeFlags holds list of flags that are set when development mode is on.
@@ -225,7 +219,6 @@ var ValidatorFlags = append(deprecatedFlags, []cli.Flag{
EnableBeaconRESTApi,
DisableDutiesV2,
EnableWebFlag,
SSZOnly,
}...)
// E2EValidatorFlags contains a list of the validator feature flags to be tested in E2E.

View File

@@ -4,6 +4,10 @@ import (
"testing"
)
const (
EnvNameOverrideAccept = "PRYSM_API_OVERRIDE_ACCEPT"
)
// SetupTestConfigCleanup preserves configurations allowing to modify them within tests without any
// restrictions, everything is restored after the test.
func SetupTestConfigCleanup(t testing.TB) {

View File

@@ -27,7 +27,7 @@ type VersionedUnmarshaler struct {
// Fork aligns with the fork names in config/params/values.go
Fork int
// Version corresponds to the Version type defined in the beacon-chain spec, aka a "fork version number":
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#custom-types
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#custom-types
Version [fieldparams.VersionLength]byte
}

View File

@@ -73,7 +73,7 @@ message BeaconBlockBody {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4

View File

@@ -171,7 +171,7 @@ message BeaconBlockBody {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4
@@ -402,7 +402,7 @@ message BeaconBlockBodyAltair {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4
@@ -487,7 +487,7 @@ message BeaconBlockBodyBellatrix {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4
@@ -629,7 +629,7 @@ message BeaconBlockBodyCapella {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4
@@ -813,7 +813,7 @@ message BeaconBlockBodyDeneb {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/core/0_beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/core/0_beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4
@@ -1040,7 +1040,7 @@ message BeaconBlockBodyElectra {
// Block operations
// Refer to spec constants at
// https://github.com/ethereum/consensus-specs/blob/dev/specs/core/0_beacon-chain.md#max-operations-per-block
// https://github.com/ethereum/consensus-specs/blob/master/specs/core/0_beacon-chain.md#max-operations-per-block
// At most MAX_PROPOSER_SLASHINGS.
repeated ProposerSlashing proposer_slashings = 4

View File

@@ -100,7 +100,7 @@ service BeaconChain {
// that it was included in a block. The attestation may have expired.
// Refer to the Ethereum Beacon Chain specification for more details on how
// attestations are processed and when they are no longer valid.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#attestations
rpc AttestationPool(AttestationPoolRequest)
returns (AttestationPoolResponse) {
option deprecated = true;
@@ -117,7 +117,7 @@ service BeaconChain {
// that it was included in a block. The attestation may have expired.
// Refer to the Ethereum Beacon Chain specification for more details on how
// attestations are processed and when they are no longer valid.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#attestations
rpc AttestationPoolElectra(AttestationPoolRequest)
returns (AttestationPoolElectraResponse) {
option deprecated = true;

View File

@@ -191,7 +191,7 @@ message PowBlock {
// The beacon state for Altair hard fork 1.
// Reference:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/beacon-chain.md#beaconstate
// https://github.com/ethereum/consensus-specs/blob/master/specs/altair/beacon-chain.md#beaconstate
message BeaconStateAltair {
// Versioning [1001-2000]
uint64 genesis_time = 1001;
@@ -747,4 +747,4 @@ message BeaconStateFulu {
// Fields introduced in Fulu fork [13001-14000]
repeated uint64 proposer_lookahead = 13001
[ (ethereum.eth.ext.ssz_size) = "proposer_lookahead_size" ];
}
}

View File

@@ -248,9 +248,6 @@ func (v *ValidatorNode) Start(ctx context.Context) error {
args = append(args,
fmt.Sprintf("--%s=http://localhost:%d", flags.BeaconRESTApiProviderFlag.Name, beaconRestApiPort),
fmt.Sprintf("--%s", features.EnableBeaconRESTApi.Name))
if v.config.UseSSZOnly {
args = append(args, fmt.Sprintf("--%s", features.SSZOnly.Name))
}
}
// Only apply e2e flags to the current branch. New flags may not exist in previous release.

View File

@@ -25,14 +25,14 @@ func TestEndToEnd_MinimalConfig_Web3Signer_PersistentKeys(t *testing.T) {
e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithRemoteSignerAndPersistentKeysFile()).run()
}
func TestEndToEnd_MinimalConfig_ValidatorRESTApi(t *testing.T) {
e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithValidatorRESTApi()).run()
}
func TestEndToEnd_MinimalConfig_ValidatorRESTApi_SSZ(t *testing.T) {
e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithValidatorRESTApi(), types.WithSSZOnly()).run()
}
func TestEndToEnd_MinimalConfig_ValidatorRESTApi(t *testing.T) {
e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithValidatorRESTApi()).run()
}
func TestEndToEnd_ScenarioRun_EEOffline(t *testing.T) {
t.Skip("TODO(#10242) Prysm is current unable to handle an offline e2e")
cfg := types.InitForkCfg(version.Bellatrix, version.Deneb, params.E2ETestConfig())

View File

@@ -11,9 +11,11 @@ go_library(
importpath = "github.com/OffchainLabs/prysm/v6/testing/endtoend/types",
visibility = ["//testing/endtoend:__subpackages__"],
deps = [
"//api:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//runtime/version:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)

View File

@@ -6,9 +6,11 @@ import (
"context"
"os"
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
@@ -51,18 +53,20 @@ func WithValidatorRESTApi() E2EConfigOpt {
}
}
func WithSSZOnly() E2EConfigOpt {
return func(cfg *E2EConfig) {
cfg.UseSSZOnly = true
}
}
func WithBuilder() E2EConfigOpt {
return func(cfg *E2EConfig) {
cfg.UseBuilder = true
}
}
func WithSSZOnly() E2EConfigOpt {
return func(cfg *E2EConfig) {
if err := os.Setenv(params.EnvNameOverrideAccept, api.OctetStreamMediaType); err != nil {
logrus.Fatal(err)
}
}
}
// E2EConfig defines the struct for all configurations needed for E2E testing.
type E2EConfig struct {
TestCheckpointSync bool
@@ -76,7 +80,6 @@ type E2EConfig struct {
UseFixedPeerIDs bool
UseValidatorCrossClient bool
UseBeaconRestApi bool
UseSSZOnly bool
UseBuilder bool
EpochsToRun uint64
Seed int64

View File

@@ -46,7 +46,6 @@ go_library(
"//api/server/structs:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
@@ -128,7 +127,6 @@ go_test(
"//api/server/structs:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/rpc/eth/shared/testing:go_default_library",
"//config/features:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/validator:go_default_library",

View File

@@ -9,7 +9,6 @@ import (
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/api/server/structs"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/assert"
@@ -215,11 +214,6 @@ func TestGetBeaconBlock_SSZ_BellatrixValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBellatrixBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -262,11 +256,6 @@ func TestGetBeaconBlock_SSZ_BlindedBellatrixValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBlindedBellatrixBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -309,11 +298,6 @@ func TestGetBeaconBlock_SSZ_CapellaValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoCapellaBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -356,11 +340,6 @@ func TestGetBeaconBlock_SSZ_BlindedCapellaValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBlindedCapellaBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -403,11 +382,6 @@ func TestGetBeaconBlock_SSZ_DenebValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoDenebBeaconBlockContents()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -450,11 +424,6 @@ func TestGetBeaconBlock_SSZ_BlindedDenebValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBlindedDenebBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -497,11 +466,6 @@ func TestGetBeaconBlock_SSZ_ElectraValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoElectraBeaconBlockContents()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -544,11 +508,6 @@ func TestGetBeaconBlock_SSZ_BlindedElectraValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBlindedElectraBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -591,11 +550,6 @@ func TestGetBeaconBlock_SSZ_UnsupportedVersion(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
const slot = primitives.Slot(1)
randaoReveal := []byte{2}
graffiti := []byte{3}
@@ -625,11 +579,6 @@ func TestGetBeaconBlock_SSZ_InvalidBlindedHeader(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBellatrixBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -663,11 +612,6 @@ func TestGetBeaconBlock_SSZ_InvalidVersionHeader(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoBellatrixBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -701,11 +645,6 @@ func TestGetBeaconBlock_SSZ_GetSSZError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
const slot = primitives.Slot(1)
randaoReveal := []byte{2}
graffiti := []byte{3}
@@ -731,11 +670,6 @@ func TestGetBeaconBlock_SSZ_Phase0Valid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoPhase0BeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)
@@ -778,11 +712,6 @@ func TestGetBeaconBlock_SSZ_AltairValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
resetFn := features.InitWithReset(&features.Flags{
SSZOnly: true,
})
defer resetFn()
proto := testhelpers.GenerateProtoAltairBeaconBlock()
bytes, err := proto.MarshalSSZ()
require.NoError(t, err)

View File

@@ -7,15 +7,19 @@ import (
"fmt"
"io"
"net/http"
"os"
"strings"
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/api/apiutil"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/network/httputil"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type reqOption func(*http.Request)
type RestHandler interface {
Get(ctx context.Context, endpoint string, resp interface{}) error
GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error)
@@ -26,16 +30,30 @@ type RestHandler interface {
}
type BeaconApiRestHandler struct {
client http.Client
host string
client http.Client
host string
reqOverrides []reqOption
}
// NewBeaconApiRestHandler returns a RestHandler
func NewBeaconApiRestHandler(client http.Client, host string) RestHandler {
return &BeaconApiRestHandler{
brh := &BeaconApiRestHandler{
client: client,
host: host,
}
brh.appendAcceptOverride()
return brh
}
// appendAcceptOverride enables the Accept header to be customized at runtime via an environment variable.
// This is specified as an env var because it is a niche option that prysm may use for performance testing or debugging
// bug which users are unlikely to need. Using an env var keeps the set of user-facing flags cleaner.
func (c *BeaconApiRestHandler) appendAcceptOverride() {
if accept := os.Getenv(params.EnvNameOverrideAccept); accept != "" {
c.reqOverrides = append(c.reqOverrides, func(req *http.Request) {
req.Header.Set("Accept", accept)
})
}
}
// HttpClient returns the underlying HTTP client of the handler
@@ -56,7 +74,6 @@ func (c *BeaconApiRestHandler) Get(ctx context.Context, endpoint string, resp in
if err != nil {
return errors.Wrapf(err, "failed to create request for endpoint %s", url)
}
httpResp, err := c.client.Do(req)
if err != nil {
return errors.Wrapf(err, "failed to perform request for endpoint %s", url)
@@ -76,13 +93,16 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to create request for endpoint %s", url)
}
primaryAcceptType := fmt.Sprintf("%s;q=%s", api.OctetStreamMediaType, "0.95")
secondaryAcceptType := fmt.Sprintf("%s;q=%s", api.JsonMediaType, "0.9")
acceptHeaderString := fmt.Sprintf("%s,%s", primaryAcceptType, secondaryAcceptType)
if features.Get().SSZOnly {
acceptHeaderString = api.OctetStreamMediaType
}
req.Header.Set("Accept", acceptHeaderString)
for _, o := range c.reqOverrides {
o(req)
}
httpResp, err := c.client.Do(req)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to perform request for endpoint %s", url)
@@ -92,16 +112,17 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
return
}
}()
accept := req.Header.Get("Accept")
contentType := httpResp.Header.Get("Content-Type")
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to read response body for %s", httpResp.Request.URL)
}
if !strings.Contains(primaryAcceptType, contentType) {
if !apiutil.PrimaryAcceptMatches(accept, contentType) {
log.WithFields(logrus.Fields{
"primaryAcceptType": primaryAcceptType,
"secondaryAcceptType": secondaryAcceptType,
"receivedAcceptType": contentType,
"Accept": accept,
"Content-Type": contentType,
}).Debug("Server responded with non primary accept type")
}
@@ -115,10 +136,6 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
return nil, nil, errorJson
}
if features.Get().SSZOnly && contentType != api.OctetStreamMediaType {
return nil, nil, errors.Errorf("server responded with non primary accept type %s", contentType)
}
return body, httpResp.Header, nil
}

View File

@@ -7,11 +7,13 @@ import (
"io"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/api/server/structs"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/network/httputil"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
@@ -143,6 +145,25 @@ func TestGetSSZ(t *testing.T) {
})
}
func TestAcceptOverrideSSZ(t *testing.T) {
name := "TestAcceptOverride"
orig := os.Getenv(params.EnvNameOverrideAccept)
defer func() {
require.NoError(t, os.Setenv(params.EnvNameOverrideAccept, orig))
}()
require.NoError(t, os.Setenv(params.EnvNameOverrideAccept, name))
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, name, r.Header.Get("Accept"))
w.WriteHeader(200)
_, err := w.Write([]byte("ok"))
require.NoError(t, err)
}))
defer srv.Close()
c := NewBeaconApiRestHandler(http.Client{Timeout: time.Second * 5}, srv.URL)
_, _, err := c.GetSSZ(t.Context(), "/test")
require.NoError(t, err)
}
func TestPost(t *testing.T) {
ctx := t.Context()
const endpoint = "/example/rest/api/endpoint"