mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
Compare commits
4 Commits
fusaka-dev
...
fix-propos
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c3375d87c | ||
|
|
856742ff68 | ||
|
|
abe16a9cb4 | ||
|
|
77958022e7 |
@@ -2,18 +2,28 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["common.go"],
|
||||
srcs = [
|
||||
"common.go",
|
||||
"header.go",
|
||||
],
|
||||
importpath = "github.com/OffchainLabs/prysm/v6/api/apiutil",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = ["//consensus-types/primitives:go_default_library"],
|
||||
deps = [
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["common_test.go"],
|
||||
srcs = [
|
||||
"common_test.go",
|
||||
"header_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
122
api/apiutil/header.go
Normal file
122
api/apiutil/header.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package apiutil
|
||||
|
||||
import (
|
||||
"mime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type mediaRange struct {
|
||||
mt string // canonicalised media‑type, e.g. "application/json"
|
||||
q float64 // quality factor (0‑1)
|
||||
raw string // original string – useful for logging/debugging
|
||||
spec int // 2=exact, 1=type/*, 0=*/*
|
||||
}
|
||||
|
||||
func parseMediaRange(field string) (mediaRange, bool) {
|
||||
field = strings.TrimSpace(field)
|
||||
|
||||
mt, params, err := mime.ParseMediaType(field)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Failed to parse header field")
|
||||
return mediaRange{}, false
|
||||
}
|
||||
|
||||
r := mediaRange{mt: mt, q: 1, spec: 2, raw: field}
|
||||
|
||||
if qs, ok := params["q"]; ok {
|
||||
v, err := strconv.ParseFloat(qs, 64)
|
||||
if err != nil || v < 0 || v > 1 {
|
||||
log.WithField("q", qs).Debug("Invalid quality factor (0‑1)")
|
||||
return mediaRange{}, false // skip invalid entry
|
||||
}
|
||||
r.q = v
|
||||
}
|
||||
|
||||
switch {
|
||||
case mt == "*/*":
|
||||
r.spec = 0
|
||||
case strings.HasSuffix(mt, "/*"):
|
||||
r.spec = 1
|
||||
}
|
||||
return r, true
|
||||
}
|
||||
|
||||
func hasExplicitQ(r mediaRange) bool {
|
||||
return strings.Contains(strings.ToLower(r.raw), ";q=")
|
||||
}
|
||||
|
||||
// ParseAccept returns media ranges sorted by q (desc) then specificity.
|
||||
func ParseAccept(header string) []mediaRange {
|
||||
if header == "" {
|
||||
return []mediaRange{{mt: "*/*", q: 1, spec: 0, raw: "*/*"}}
|
||||
}
|
||||
|
||||
var out []mediaRange
|
||||
for _, field := range strings.Split(header, ",") {
|
||||
if r, ok := parseMediaRange(field); ok {
|
||||
out = append(out, r)
|
||||
}
|
||||
}
|
||||
|
||||
sort.SliceStable(out, func(i, j int) bool {
|
||||
ei, ej := hasExplicitQ(out[i]), hasExplicitQ(out[j])
|
||||
if ei != ej {
|
||||
return ei // explicit beats implicit
|
||||
}
|
||||
if out[i].q != out[j].q {
|
||||
return out[i].q > out[j].q
|
||||
}
|
||||
return out[i].spec > out[j].spec
|
||||
})
|
||||
return out
|
||||
}
|
||||
|
||||
// Matches reports whether content type is acceptable per the header.
|
||||
func Matches(header, ct string) bool {
|
||||
for _, r := range ParseAccept(header) {
|
||||
switch {
|
||||
case r.q == 0:
|
||||
continue
|
||||
case r.mt == "*/*":
|
||||
return true
|
||||
case strings.HasSuffix(r.mt, "/*"):
|
||||
if strings.HasPrefix(ct, r.mt[:len(r.mt)-1]) {
|
||||
return true
|
||||
}
|
||||
case r.mt == ct:
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Negotiate selects the best server type according to the header.
|
||||
// Returns the chosen type and true, or "", false when nothing matches.
|
||||
func Negotiate(header string, serverTypes []string) (string, bool) {
|
||||
for _, r := range ParseAccept(header) {
|
||||
if r.q == 0 {
|
||||
continue
|
||||
}
|
||||
for _, s := range serverTypes {
|
||||
if Matches(r.mt, s) {
|
||||
return s, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
// PrimaryAcceptMatches only checks if the first accept matches
|
||||
func PrimaryAcceptMatches(header, produced string) bool {
|
||||
for _, r := range ParseAccept(header) {
|
||||
if r.q == 0 {
|
||||
continue // explicitly unacceptable – skip
|
||||
}
|
||||
return Matches(r.mt, produced)
|
||||
}
|
||||
return false
|
||||
}
|
||||
174
api/apiutil/header_test.go
Normal file
174
api/apiutil/header_test.go
Normal file
@@ -0,0 +1,174 @@
|
||||
package apiutil
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
)
|
||||
|
||||
func TestParseAccept(t *testing.T) {
|
||||
type want struct {
|
||||
mt string
|
||||
q float64
|
||||
spec int
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
header string
|
||||
want []want
|
||||
}{
|
||||
{
|
||||
name: "empty header becomes */*;q=1",
|
||||
header: "",
|
||||
want: []want{{mt: "*/*", q: 1, spec: 0}},
|
||||
},
|
||||
{
|
||||
name: "quality ordering then specificity",
|
||||
header: "application/json;q=0.2, */*;q=0.1, application/xml;q=0.5, text/*;q=0.5",
|
||||
want: []want{
|
||||
{mt: "application/xml", q: 0.5, spec: 2},
|
||||
{mt: "text/*", q: 0.5, spec: 1},
|
||||
{mt: "application/json", q: 0.2, spec: 2},
|
||||
{mt: "*/*", q: 0.1, spec: 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid pieces are skipped",
|
||||
header: "text/plain; q=boom, application/json",
|
||||
want: []want{{mt: "application/json", q: 1, spec: 2}},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := ParseAccept(tc.header)
|
||||
gotProjected := make([]want, len(got))
|
||||
for i, g := range got {
|
||||
gotProjected[i] = want{mt: g.mt, q: g.q, spec: g.spec}
|
||||
}
|
||||
require.DeepEqual(t, gotProjected, tc.want)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMatches(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
accept string
|
||||
ct string
|
||||
matches bool
|
||||
}{
|
||||
{"exact match", "application/json", "application/json", true},
|
||||
{"type wildcard", "application/*;q=0.8", "application/xml", true},
|
||||
{"global wildcard", "*/*;q=0.1", "image/png", true},
|
||||
{"explicitly unacceptable (q=0)", "text/*;q=0", "text/plain", false},
|
||||
{"no match", "image/png", "application/json", false},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := Matches(tc.accept, tc.ct)
|
||||
require.Equal(t, tc.matches, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNegotiate(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
accept string
|
||||
serverTypes []string
|
||||
wantType string
|
||||
ok bool
|
||||
}{
|
||||
{
|
||||
name: "highest quality wins",
|
||||
accept: "application/json;q=0.8,application/xml;q=0.9",
|
||||
serverTypes: []string{"application/json", "application/xml"},
|
||||
wantType: "application/xml",
|
||||
ok: true,
|
||||
},
|
||||
{
|
||||
name: "wildcard matches first server type",
|
||||
accept: "*/*;q=0.5",
|
||||
serverTypes: []string{"application/octet-stream", "application/json"},
|
||||
wantType: "application/octet-stream",
|
||||
ok: true,
|
||||
},
|
||||
{
|
||||
name: "no acceptable type",
|
||||
accept: "image/png",
|
||||
serverTypes: []string{"application/json"},
|
||||
wantType: "",
|
||||
ok: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got, ok := Negotiate(tc.accept, tc.serverTypes)
|
||||
require.Equal(t, tc.ok, ok)
|
||||
require.Equal(t, tc.wantType, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrimaryAcceptMatches(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
accept string
|
||||
produced string
|
||||
expect bool
|
||||
}{
|
||||
{
|
||||
name: "prefers json",
|
||||
accept: "application/json;q=0.9,application/xml",
|
||||
produced: "application/json",
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
name: "wildcard application beats other wildcard",
|
||||
accept: "application/*;q=0.2,*/*;q=0.1",
|
||||
produced: "application/xml",
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
name: "json wins",
|
||||
accept: "application/xml;q=0.8,application/json;q=0.9",
|
||||
produced: "application/json",
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
name: "json loses",
|
||||
accept: "application/xml;q=0.8,application/json;q=0.9,application/octet-stream;q=0.99",
|
||||
produced: "application/json",
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
name: "json wins with non q option",
|
||||
accept: "application/xml;q=0.8,image/png,application/json;q=0.9",
|
||||
produced: "application/json",
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
name: "json not primary",
|
||||
accept: "image/png,application/json",
|
||||
produced: "application/json",
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
name: "absent header",
|
||||
accept: "",
|
||||
produced: "text/plain",
|
||||
expect: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := PrimaryAcceptMatches(tc.accept, tc.produced)
|
||||
require.Equal(t, got, tc.expect)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ go_library(
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//api:go_default_library",
|
||||
"//api/apiutil:go_default_library",
|
||||
"@com_github_rs_cors//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/api"
|
||||
"github.com/OffchainLabs/prysm/v6/api/apiutil"
|
||||
"github.com/rs/cors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -74,42 +75,10 @@ func ContentTypeHandler(acceptedMediaTypes []string) Middleware {
|
||||
func AcceptHeaderHandler(serverAcceptedTypes []string) Middleware {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
acceptHeader := r.Header.Get("Accept")
|
||||
// header is optional and should skip if not provided
|
||||
if acceptHeader == "" {
|
||||
next.ServeHTTP(w, r)
|
||||
if _, ok := apiutil.Negotiate(r.Header.Get("Accept"), serverAcceptedTypes); !ok {
|
||||
http.Error(w, "Not Acceptable", http.StatusNotAcceptable)
|
||||
return
|
||||
}
|
||||
|
||||
accepted := false
|
||||
acceptTypes := strings.Split(acceptHeader, ",")
|
||||
// follows rules defined in https://datatracker.ietf.org/doc/html/rfc2616#section-14.1
|
||||
for _, acceptType := range acceptTypes {
|
||||
acceptType = strings.TrimSpace(acceptType)
|
||||
if acceptType == "*/*" {
|
||||
accepted = true
|
||||
break
|
||||
}
|
||||
for _, serverAcceptedType := range serverAcceptedTypes {
|
||||
if strings.HasPrefix(acceptType, serverAcceptedType) {
|
||||
accepted = true
|
||||
break
|
||||
}
|
||||
if acceptType != "/*" && strings.HasSuffix(acceptType, "/*") && strings.HasPrefix(serverAcceptedType, acceptType[:len(acceptType)-2]) {
|
||||
accepted = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if accepted {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !accepted {
|
||||
http.Error(w, fmt.Sprintf("Not Acceptable: %s", acceptHeader), http.StatusNotAcceptable)
|
||||
return
|
||||
}
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/presets/mainnet/trusted_setups/trusted_setup_4096.json
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/presets/mainnet/trusted_setups/trusted_setup_4096.json
|
||||
//go:embed trusted_setup_4096.json
|
||||
embeddedTrustedSetup []byte // 1.2Mb
|
||||
kzgContext *GoKZG.Context
|
||||
|
||||
@@ -177,7 +177,7 @@ func (s *Service) processAttestations(ctx context.Context, disparity time.Durati
|
||||
for _, a := range atts {
|
||||
// Based on the spec, don't process the attestation until the subsequent slot.
|
||||
// This delays consideration in the fork choice until their slot is in the past.
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/fork-choice.md#validate_on_attestation
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/fork-choice.md#validate_on_attestation
|
||||
nextSlot := a.GetData().Slot + 1
|
||||
if err := slots.VerifyTime(s.genesisTime, nextSlot, disparity); err != nil {
|
||||
continue
|
||||
|
||||
@@ -206,9 +206,9 @@ func ParseWeakSubjectivityInputString(wsCheckpointString string) (*v1alpha1.Chec
|
||||
// MinEpochsForBlockRequests computes the number of epochs of block history that we need to maintain,
|
||||
// relative to the current epoch, per the p2p specs. This is used to compute the slot where backfill is complete.
|
||||
// value defined:
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#configuration
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#configuration
|
||||
// MIN_VALIDATOR_WITHDRAWABILITY_DELAY + CHURN_LIMIT_QUOTIENT // 2 (= 33024, ~5 months)
|
||||
// detailed rationale: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#why-are-blocksbyrange-requests-only-required-to-be-served-for-the-latest-min_epochs_for_block_requests-epochs
|
||||
// detailed rationale: https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#why-are-blocksbyrange-requests-only-required-to-be-served-for-the-latest-min_epochs_for_block_requests-epochs
|
||||
func MinEpochsForBlockRequests() primitives.Epoch {
|
||||
return params.BeaconConfig().MinValidatorWithdrawabilityDelay +
|
||||
primitives.Epoch(params.BeaconConfig().ChurnLimitQuotient/2)
|
||||
|
||||
@@ -292,7 +292,7 @@ func TestMinEpochsForBlockRequests(t *testing.T) {
|
||||
params.SetActiveTestCleanup(t, params.MainnetConfig())
|
||||
var expected primitives.Epoch = 33024
|
||||
// expected value of 33024 via spec commentary:
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#why-are-blocksbyrange-requests-only-required-to-be-served-for-the-latest-min_epochs_for_block_requests-epochs
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#why-are-blocksbyrange-requests-only-required-to-be-served-for-the-latest-min_epochs_for_block_requests-epochs
|
||||
// MIN_EPOCHS_FOR_BLOCK_REQUESTS is calculated using the arithmetic from compute_weak_subjectivity_period found in the weak subjectivity guide. Specifically to find this max epoch range, we use the worst case event of a very large validator size (>= MIN_PER_EPOCH_CHURN_LIMIT * CHURN_LIMIT_QUOTIENT).
|
||||
//
|
||||
// MIN_EPOCHS_FOR_BLOCK_REQUESTS = (
|
||||
|
||||
@@ -126,7 +126,7 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, curre
|
||||
return errors.Wrap(err, "entry filter")
|
||||
}
|
||||
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
|
||||
verifier := s.newDataColumnsVerifier(roDataColumns, verification.ByRangeRequestDataColumnSidecarRequirements)
|
||||
|
||||
if err := verifier.ValidFields(); err != nil {
|
||||
|
||||
@@ -75,7 +75,7 @@ data-columns
|
||||
|
||||
Computation of the maximum size of a DataColumnSidecar
|
||||
------------------------------------------------------
|
||||
https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/das-core.md#datacolumnsidecar
|
||||
https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#datacolumnsidecar
|
||||
|
||||
|
||||
class DataColumnSidecar(Container):
|
||||
|
||||
@@ -104,6 +104,7 @@ go_library(
|
||||
"@com_github_libp2p_go_mplex//:go_default_library",
|
||||
"@com_github_multiformats_go_multiaddr//:go_default_library",
|
||||
"@com_github_multiformats_go_multiaddr//net:go_default_library",
|
||||
"@com_github_patrickmn_go_cache//:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
/*
|
||||
Package p2p implements the Ethereum consensus networking specification.
|
||||
|
||||
Canonical spec reference: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md
|
||||
Canonical spec reference: https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md
|
||||
|
||||
Prysm specific implementation design docs
|
||||
- Networking Design Doc: https://docs.google.com/document/d/1VyhobQRkEjEkEPxmmdWvaHfKWn0j6dEae_wLZlrFtfU/view
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -79,8 +80,8 @@ func (s *Service) disconnectFromPeerOnError(
|
||||
// and validating the response from the peer.
|
||||
func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Context, id peer.ID) error) {
|
||||
// Peer map and lock to keep track of current connection attempts.
|
||||
var peerLock sync.Mutex
|
||||
peerMap := make(map[peer.ID]bool)
|
||||
peerLock := new(sync.Mutex)
|
||||
|
||||
// This is run at the start of each connection attempt, to ensure
|
||||
// that there aren't multiple inflight connection requests for the
|
||||
@@ -108,6 +109,19 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
|
||||
s.host.Network().Notify(&network.NotifyBundle{
|
||||
ConnectedF: func(_ network.Network, conn network.Conn) {
|
||||
remotePeer := conn.RemotePeer()
|
||||
log := log.WithField("peer", remotePeer)
|
||||
direction := conn.Stat().Direction
|
||||
|
||||
// For some reason, right after a disconnection, this `ConnectedF` callback
|
||||
// is called. We want to avoid processing this connection if the peer was
|
||||
// disconnected too recently and if we are at the initiative of this connection.
|
||||
// This is very probably a bug in libp2p.
|
||||
if direction == network.DirOutbound {
|
||||
if err := s.wasDisconnectedTooRecently(remotePeer); err != nil {
|
||||
log.WithError(err).Debug("Skipping connection handler")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Connection handler must be non-blocking as part of libp2p design.
|
||||
go func() {
|
||||
@@ -133,53 +147,56 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
|
||||
return
|
||||
}
|
||||
|
||||
// Do not perform handshake on inbound dials.
|
||||
if conn.Stat().Direction == network.DirInbound {
|
||||
_, err := s.peers.ChainState(remotePeer)
|
||||
peerExists := err == nil
|
||||
currentTime := prysmTime.Now()
|
||||
|
||||
// Wait for peer to initiate handshake
|
||||
time.Sleep(timeForStatus)
|
||||
|
||||
// Exit if we are disconnected with the peer.
|
||||
if s.host.Network().Connectedness(remotePeer) != network.Connected {
|
||||
if direction != network.DirInbound {
|
||||
s.peers.SetConnectionState(conn.RemotePeer(), peers.Connecting)
|
||||
if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && !errors.Is(err, io.EOF) {
|
||||
s.disconnectFromPeerOnError(conn, goodByeFunc, err)
|
||||
return
|
||||
}
|
||||
|
||||
// If peer hasn't sent a status request, we disconnect with them
|
||||
if _, err := s.peers.ChainState(remotePeer); errors.Is(err, peerdata.ErrPeerUnknown) || errors.Is(err, peerdata.ErrNoPeerStatus) {
|
||||
statusMessageMissing.Inc()
|
||||
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state"))
|
||||
return
|
||||
}
|
||||
|
||||
if peerExists {
|
||||
updated, err := s.peers.ChainStateLastUpdated(remotePeer)
|
||||
if err != nil {
|
||||
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state last updated"))
|
||||
return
|
||||
}
|
||||
|
||||
// Exit if we don't receive any current status messages from peer.
|
||||
if updated.IsZero() {
|
||||
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("is zero"))
|
||||
return
|
||||
}
|
||||
|
||||
if updated.Before(currentTime) {
|
||||
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("did not update"))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
s.connectToPeer(conn)
|
||||
return
|
||||
}
|
||||
|
||||
s.peers.SetConnectionState(conn.RemotePeer(), peers.Connecting)
|
||||
if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && !errors.Is(err, io.EOF) {
|
||||
s.disconnectFromPeerOnError(conn, goodByeFunc, err)
|
||||
// The connection is inbound.
|
||||
_, err = s.peers.ChainState(remotePeer)
|
||||
peerExists := err == nil
|
||||
currentTime := prysmTime.Now()
|
||||
|
||||
// Wait for peer to initiate handshake
|
||||
time.Sleep(timeForStatus)
|
||||
|
||||
// Exit if we are disconnected with the peer.
|
||||
if s.host.Network().Connectedness(remotePeer) != network.Connected {
|
||||
return
|
||||
}
|
||||
|
||||
// If peer hasn't sent a status request, we disconnect with them
|
||||
if _, err := s.peers.ChainState(remotePeer); errors.Is(err, peerdata.ErrPeerUnknown) || errors.Is(err, peerdata.ErrNoPeerStatus) {
|
||||
statusMessageMissing.Inc()
|
||||
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state"))
|
||||
return
|
||||
}
|
||||
|
||||
if !peerExists {
|
||||
s.connectToPeer(conn)
|
||||
return
|
||||
}
|
||||
|
||||
updated, err := s.peers.ChainStateLastUpdated(remotePeer)
|
||||
if err != nil {
|
||||
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state last updated"))
|
||||
return
|
||||
}
|
||||
|
||||
// Exit if we don't receive any current status messages from peer.
|
||||
if updated.IsZero() {
|
||||
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("is zero"))
|
||||
return
|
||||
}
|
||||
|
||||
if updated.Before(currentTime) {
|
||||
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("did not update"))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -220,6 +237,12 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
|
||||
}
|
||||
|
||||
s.peers.SetConnectionState(peerID, peers.Disconnected)
|
||||
if err := s.peerDisconnectionTime.Add(peerID.String(), time.Now(), cache.DefaultExpiration); err != nil {
|
||||
// The `DisconnectedF` funcition already called for this peer less than `cache.DefaultExpiration` ago. Skip.
|
||||
// (Very probably a bug in libp2p.)
|
||||
log.WithError(err).Trace("Failed to set peer disconnection time")
|
||||
return
|
||||
}
|
||||
|
||||
// Only log disconnections if we were fully connected.
|
||||
if priorState == peers.Connected {
|
||||
@@ -231,6 +254,28 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
|
||||
})
|
||||
}
|
||||
|
||||
// wasDisconnectedTooRecently checks if the peer was disconnected within the last second.
|
||||
func (s *Service) wasDisconnectedTooRecently(peerID peer.ID) error {
|
||||
const disconnectionDurationThreshold = 1 * time.Second
|
||||
|
||||
peerDisconnectionTimeObj, ok := s.peerDisconnectionTime.Get(peerID.String())
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
peerDisconnectionTime, ok := peerDisconnectionTimeObj.(time.Time)
|
||||
if !ok {
|
||||
return errors.New("invalid peer disconnection time type")
|
||||
}
|
||||
|
||||
timeSinceDisconnection := time.Since(peerDisconnectionTime)
|
||||
if timeSinceDisconnection < disconnectionDurationThreshold {
|
||||
return errors.Errorf("peer %s was disconnected too recently: %s", peerID, timeSinceDisconnection)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func agentString(pid peer.ID, hst host.Host) string {
|
||||
rawVersion, storeErr := hst.Peerstore().Get(pid, agentVersionKey)
|
||||
|
||||
|
||||
@@ -101,21 +101,24 @@ func (s *BadResponsesScorer) countNoLock(pid peer.ID) (int, error) {
|
||||
|
||||
// Increment increments the number of bad responses we have received from the given remote peer.
|
||||
// If peer doesn't exist this method is no-op.
|
||||
func (s *BadResponsesScorer) Increment(pid peer.ID) {
|
||||
func (s *BadResponsesScorer) Increment(pid peer.ID) int {
|
||||
if pid == "" {
|
||||
return
|
||||
return 0
|
||||
}
|
||||
|
||||
s.store.Lock()
|
||||
defer s.store.Unlock()
|
||||
|
||||
peerData, ok := s.store.PeerData(pid)
|
||||
if !ok {
|
||||
s.store.SetPeerData(pid, &peerdata.PeerData{
|
||||
BadResponses: 1,
|
||||
})
|
||||
return
|
||||
if ok {
|
||||
peerData.BadResponses++
|
||||
return peerData.BadResponses
|
||||
}
|
||||
peerData.BadResponses++
|
||||
|
||||
const badResponses = 1
|
||||
peerData = &peerdata.PeerData{BadResponses: badResponses}
|
||||
s.store.SetPeerData(pid, peerData)
|
||||
return badResponses
|
||||
}
|
||||
|
||||
// IsBadPeer states if the peer is to be considered bad.
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -86,6 +87,7 @@ type Service struct {
|
||||
genesisTime time.Time
|
||||
genesisValidatorsRoot []byte
|
||||
activeValidatorCount uint64
|
||||
peerDisconnectionTime *cache.Cache
|
||||
}
|
||||
|
||||
// NewService initializes a new p2p service compatible with shared.Service interface. No
|
||||
@@ -115,16 +117,17 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
|
||||
ipLimiter := leakybucket.NewCollector(ipLimit, ipBurst, 30*time.Second, true /* deleteEmptyBuckets */)
|
||||
|
||||
s := &Service{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
cfg: cfg,
|
||||
addrFilter: addrFilter,
|
||||
ipLimiter: ipLimiter,
|
||||
privKey: privKey,
|
||||
metaData: metaData,
|
||||
isPreGenesis: true,
|
||||
joinedTopics: make(map[string]*pubsub.Topic, len(gossipTopicMappings)),
|
||||
subnetsLock: make(map[uint64]*sync.RWMutex),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
cfg: cfg,
|
||||
addrFilter: addrFilter,
|
||||
ipLimiter: ipLimiter,
|
||||
privKey: privKey,
|
||||
metaData: metaData,
|
||||
isPreGenesis: true,
|
||||
joinedTopics: make(map[string]*pubsub.Topic, len(gossipTopicMappings)),
|
||||
subnetsLock: make(map[uint64]*sync.RWMutex),
|
||||
peerDisconnectionTime: cache.New(1*time.Second, 1*time.Minute),
|
||||
}
|
||||
|
||||
ipAddr := prysmnetwork.IPAddr()
|
||||
@@ -486,14 +489,17 @@ func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error
|
||||
if info.ID == s.host.ID() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := s.Peers().IsBad(info.ID); err != nil {
|
||||
return errors.Wrap(err, "refused to connect to bad peer")
|
||||
return errors.Wrap(err, "bad peer")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
|
||||
defer cancel()
|
||||
|
||||
if err := s.host.Connect(ctx, info); err != nil {
|
||||
s.Peers().Scorers().BadResponsesScorer().Increment(info.ID)
|
||||
return err
|
||||
s.downscorePeer(info.ID, "connectionError")
|
||||
return errors.Wrap(err, "peer connect")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -524,3 +530,8 @@ func (s *Service) connectToBootnodes() error {
|
||||
func (s *Service) isInitialized() bool {
|
||||
return !s.genesisTime.IsZero() && len(s.genesisValidatorsRoot) == 32
|
||||
}
|
||||
|
||||
func (s *Service) downscorePeer(peerID peer.ID, reason string) {
|
||||
newScore := s.Peers().Scorers().BadResponsesScorer().Increment(peerID)
|
||||
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
|
||||
}
|
||||
|
||||
@@ -403,7 +403,7 @@ func TestService_connectWithPeer(t *testing.T) {
|
||||
return ps
|
||||
}(),
|
||||
info: peer.AddrInfo{ID: "bad"},
|
||||
wantErr: "refused to connect to bad peer",
|
||||
wantErr: "bad peer",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
|
||||
@@ -181,6 +181,11 @@ func (s *Service) findPeersWithSubnets(
|
||||
// Get all needed subnets that the node is subscribed to.
|
||||
// Skip nodes that are not subscribed to any of the defective subnets.
|
||||
node := iterator.Node()
|
||||
|
||||
if !s.filterPeer(node) {
|
||||
continue
|
||||
}
|
||||
|
||||
nodeSubnets, err := filter(node)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "filter node")
|
||||
|
||||
@@ -1029,8 +1029,8 @@ func (s *Server) GetProposerDuties(w http.ResponseWriter, r *http.Request) {
|
||||
httputil.HandleError(w, fmt.Sprintf("Could not get head state: %v ", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
// Advance state with empty transitions up to the requested epoch start slot.
|
||||
if st.Slot() < epochStartSlot {
|
||||
// Advance state with empty transitions up to the requested epoch start slot for pre fulu state only. Fulu state utilizes proposer look ahead field.
|
||||
if st.Slot() < epochStartSlot && st.Version() != version.Fulu {
|
||||
headRoot, err := s.HeadFetcher.HeadRoot(ctx)
|
||||
if err != nil {
|
||||
httputil.HandleError(w, fmt.Sprintf("Could not get head root: %v ", err), http.StatusInternalServerError)
|
||||
|
||||
@@ -2645,6 +2645,78 @@ func TestGetProposerDuties(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetProposerDuties_FuluState(t *testing.T) {
|
||||
helpers.ClearCache()
|
||||
|
||||
// Create a Fulu state with slot 0 (before epoch 1 start slot which is 32)
|
||||
fuluState, err := util.NewBeaconStateFulu()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, fuluState.SetSlot(0)) // Set to slot 0
|
||||
|
||||
// Create some validators for the test
|
||||
depChainStart := params.BeaconConfig().MinGenesisActiveValidatorCount
|
||||
deposits, _, err := util.DeterministicDepositsAndKeys(depChainStart)
|
||||
require.NoError(t, err)
|
||||
|
||||
validators := make([]*ethpbalpha.Validator, len(deposits))
|
||||
for i, deposit := range deposits {
|
||||
validators[i] = ðpbalpha.Validator{
|
||||
PublicKey: deposit.Data.PublicKey,
|
||||
ActivationEpoch: 0,
|
||||
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
|
||||
WithdrawalCredentials: make([]byte, 32),
|
||||
}
|
||||
}
|
||||
require.NoError(t, fuluState.SetValidators(validators))
|
||||
|
||||
// Set up block roots
|
||||
genesis := util.NewBeaconBlock()
|
||||
genesisRoot, err := genesis.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roots := make([][]byte, fieldparams.BlockRootsLength)
|
||||
roots[0] = genesisRoot[:]
|
||||
require.NoError(t, fuluState.SetBlockRoots(roots))
|
||||
|
||||
chainSlot := primitives.Slot(0)
|
||||
chain := &mockChain.ChainService{
|
||||
State: fuluState, Root: genesisRoot[:], Slot: &chainSlot,
|
||||
}
|
||||
|
||||
db := dbutil.SetupDB(t)
|
||||
require.NoError(t, db.SaveGenesisBlockRoot(t.Context(), genesisRoot))
|
||||
|
||||
s := &Server{
|
||||
Stater: &testutil.MockStater{StatesBySlot: map[primitives.Slot]state.BeaconState{0: fuluState}},
|
||||
HeadFetcher: chain,
|
||||
TimeFetcher: chain,
|
||||
OptimisticModeFetcher: chain,
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
PayloadIDCache: cache.NewPayloadIDCache(),
|
||||
TrackedValidatorsCache: cache.NewTrackedValidatorsCache(),
|
||||
BeaconDB: db,
|
||||
}
|
||||
|
||||
// Request epoch 1 duties, which should require advancing from slot 0 to slot 32
|
||||
// But for Fulu state, this advancement should be skipped
|
||||
request := httptest.NewRequest(http.MethodGet, "http://www.example.com/eth/v1/validator/duties/proposer/{epoch}", nil)
|
||||
request.SetPathValue("epoch", "1")
|
||||
writer := httptest.NewRecorder()
|
||||
writer.Body = &bytes.Buffer{}
|
||||
|
||||
s.GetProposerDuties(writer, request)
|
||||
assert.Equal(t, http.StatusOK, writer.Code)
|
||||
|
||||
// Verify the state was not advanced - it should still be at slot 0
|
||||
// This is the key assertion for the regression test
|
||||
assert.Equal(t, primitives.Slot(0), fuluState.Slot(), "Fulu state should not have been advanced")
|
||||
|
||||
resp := &structs.GetProposerDutiesResponse{}
|
||||
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp))
|
||||
|
||||
// Should still return proposer duties despite not advancing the state
|
||||
assert.Equal(t, true, len(resp.Data) > 0, "Should return proposer duties even without state advancement")
|
||||
}
|
||||
|
||||
func TestGetSyncCommitteeDuties(t *testing.T) {
|
||||
helpers.ClearCache()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
|
||||
@@ -315,7 +315,7 @@ func (bs *Server) ListIndexedAttestationsElectra(
|
||||
// that it was included in a block. The attestation may have expired.
|
||||
// Refer to the ethereum consensus specification for more details on how
|
||||
// attestations are processed and when they are no longer valid.
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#attestations
|
||||
func (bs *Server) AttestationPool(_ context.Context, req *ethpb.AttestationPoolRequest) (*ethpb.AttestationPoolResponse, error) {
|
||||
var atts []*ethpb.Attestation
|
||||
var err error
|
||||
|
||||
@@ -262,7 +262,7 @@ func (vs *Server) activationStatus(
|
||||
// It cannot faithfully attest to the head block of the chain, since it has not fully verified that block.
|
||||
//
|
||||
// Spec:
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/sync/optimistic.md
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/sync/optimistic.md
|
||||
func (vs *Server) optimisticStatus(ctx context.Context) error {
|
||||
if slots.ToEpoch(vs.TimeFetcher.CurrentSlot()) < params.BeaconConfig().BellatrixForkEpoch {
|
||||
return nil
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
@@ -211,7 +212,7 @@ func (s *Service) importBatches(ctx context.Context) {
|
||||
_, err := s.batchImporter(ctx, current, ib, s.store)
|
||||
if err != nil {
|
||||
log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import")
|
||||
s.downscore(ib)
|
||||
s.downscorePeer(ib.blockPid, "backfillBatchImportError")
|
||||
s.batchSeq.update(ib.withState(batchErrRetryable))
|
||||
// If a batch fails, the subsequent batches are no longer considered importable.
|
||||
break
|
||||
@@ -336,10 +337,6 @@ func (s *Service) initBatches() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) downscore(b batch) {
|
||||
s.p2p.Peers().Scorers().BadResponsesScorer().Increment(b.blockPid)
|
||||
}
|
||||
|
||||
func (*Service) Stop() error {
|
||||
return nil
|
||||
}
|
||||
@@ -383,3 +380,8 @@ func (s *Service) WaitForCompletion() error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) downscorePeer(peerID peer.ID, reason string) {
|
||||
newScore := s.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
|
||||
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
|
||||
}
|
||||
|
||||
@@ -337,14 +337,15 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if errors.Is(response.err, beaconsync.ErrInvalidFetchedData) {
|
||||
// Peer returned invalid data, penalize.
|
||||
q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(response.blocksFrom)
|
||||
log.WithField("pid", response.blocksFrom).Debug("Peer is penalized for invalid blocks")
|
||||
} else if errors.Is(response.err, verification.ErrBlobInvalid) {
|
||||
q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(response.blobsFrom)
|
||||
log.WithField("pid", response.blobsFrom).Debug("Peer is penalized for invalid blob response")
|
||||
q.downscorePeer(response.blocksFrom, "invalidBlocks")
|
||||
}
|
||||
|
||||
if errors.Is(response.err, verification.ErrBlobInvalid) {
|
||||
q.downscorePeer(response.blobsFrom, "invalidBlobs")
|
||||
}
|
||||
|
||||
return m.state, response.err
|
||||
}
|
||||
m.fetched = *response
|
||||
@@ -455,6 +456,11 @@ func (q *blocksQueue) onProcessSkippedEvent(ctx context.Context) eventHandlerFn
|
||||
}
|
||||
}
|
||||
|
||||
func (q *blocksQueue) downscorePeer(peerID peer.ID, reason string) {
|
||||
newScore := q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
|
||||
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
|
||||
}
|
||||
|
||||
// onCheckStaleEvent is an event that allows to mark stale epochs,
|
||||
// so that they can be re-processed.
|
||||
func onCheckStaleEvent(ctx context.Context) eventHandlerFn {
|
||||
|
||||
@@ -522,7 +522,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
|
||||
})
|
||||
assert.ErrorContains(t, beaconsync.ErrInvalidFetchedData.Error(), err)
|
||||
assert.Equal(t, stateScheduled, updatedState)
|
||||
assert.LogsContain(t, hook, "msg=\"Peer is penalized for invalid blocks\" pid=ZiCa")
|
||||
assert.LogsContain(t, hook, "Downscore peer")
|
||||
})
|
||||
|
||||
t.Run("transition ok", func(t *testing.T) {
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/paulbellamy/ratecounter"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -345,12 +346,11 @@ func isPunishableError(err error) bool {
|
||||
func (s *Service) updatePeerScorerStats(data *blocksQueueFetchedData, count uint64, err error) {
|
||||
if isPunishableError(err) {
|
||||
if verification.IsBlobValidationFailure(err) {
|
||||
log.WithError(err).WithField("peer_id", data.blobsFrom).Warn("Downscoring peer for invalid blobs")
|
||||
s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(data.blobsFrom)
|
||||
s.downscorePeer(data.blobsFrom, "invalidBlobs")
|
||||
} else {
|
||||
log.WithError(err).WithField("peer_id", data.blocksFrom).Warn("Downscoring peer for invalid blocks")
|
||||
s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(data.blocksFrom)
|
||||
s.downscorePeer(data.blocksFrom, "invalidBlocks")
|
||||
}
|
||||
|
||||
// If the error is punishable, exit here so that we don't give them credit for providing bad blocks.
|
||||
return
|
||||
}
|
||||
@@ -376,3 +376,8 @@ func (s *Service) isProcessedBlock(ctx context.Context, blk blocks.ROBlock) bool
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *Service) downscorePeer(peerID peer.ID, reason string) {
|
||||
newScore := s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(peerID)
|
||||
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/trailofbits/go-mutexasserts"
|
||||
@@ -122,7 +123,7 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
|
||||
|
||||
collector, err := l.retrieveCollector(topic)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "retrieve collector")
|
||||
}
|
||||
|
||||
remaining := collector.Remaining(remotePeer.String())
|
||||
@@ -131,7 +132,7 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
|
||||
amt = 1
|
||||
}
|
||||
if amt > uint64(remaining) {
|
||||
l.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
|
||||
l.downscorePeer(remotePeer, topic, "rateLimitExceeded")
|
||||
writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p)
|
||||
return p2ptypes.ErrRateLimited
|
||||
}
|
||||
@@ -139,22 +140,20 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
|
||||
}
|
||||
|
||||
// This is used to validate all incoming rpc streams from external peers.
|
||||
func (l *limiter) validateRawRpcRequest(stream network.Stream) error {
|
||||
func (l *limiter) validateRawRpcRequest(stream network.Stream, amt uint64) error {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
topic := rpcLimiterTopic
|
||||
|
||||
collector, err := l.retrieveCollector(topic)
|
||||
remotePeer := stream.Conn().RemotePeer()
|
||||
collector, err := l.retrieveCollector(rpcLimiterTopic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key := stream.Conn().RemotePeer().String()
|
||||
remaining := collector.Remaining(key)
|
||||
// Treat each request as a minimum of 1.
|
||||
amt := int64(1)
|
||||
if amt > remaining {
|
||||
l.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
|
||||
if amt > uint64(remaining) {
|
||||
l.downscorePeer(remotePeer, rpcLimiterTopic, "rawRateLimitExceeded")
|
||||
writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p)
|
||||
return p2ptypes.ErrRateLimited
|
||||
}
|
||||
@@ -233,3 +232,13 @@ func (l *limiter) retrieveCollector(topic string) (*leakybucket.Collector, error
|
||||
func (_ *limiter) topicLogger(topic string) *logrus.Entry {
|
||||
return log.WithField("rateLimiter", topic)
|
||||
}
|
||||
|
||||
func (l *limiter) downscorePeer(peerID peer.ID, topic, reason string) {
|
||||
newScore := l.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
|
||||
log.WithFields(logrus.Fields{
|
||||
"peerID": peerID.String(),
|
||||
"reason": reason,
|
||||
"newScore": newScore,
|
||||
"topic": topic,
|
||||
}).Debug("Downscore peer")
|
||||
}
|
||||
|
||||
@@ -85,16 +85,16 @@ func TestRateLimiter_ExceedRawCapacity(t *testing.T) {
|
||||
require.NoError(t, err, "could not create stream")
|
||||
|
||||
for i := 0; i < 2*defaultBurstLimit; i++ {
|
||||
err = rlimiter.validateRawRpcRequest(stream)
|
||||
err = rlimiter.validateRawRpcRequest(stream, 1)
|
||||
rlimiter.addRawStream(stream)
|
||||
require.NoError(t, err, "could not validate incoming request")
|
||||
}
|
||||
// Triggers rate limit error on burst.
|
||||
assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream))
|
||||
assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream, 1))
|
||||
|
||||
// Make Peer bad.
|
||||
for i := 0; i < defaultBurstLimit; i++ {
|
||||
assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream))
|
||||
assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream, 1))
|
||||
}
|
||||
assert.NotNil(t, p1.Peers().IsBad(p2.PeerID()), "peer is not marked as a bad peer")
|
||||
require.NoError(t, stream.Close(), "could not close stream")
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/pkg/errors"
|
||||
ssz "github.com/prysmaticlabs/fastssz"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -39,7 +40,7 @@ type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error
|
||||
|
||||
// rpcHandlerByTopicFromFork returns the RPC handlers for a given fork index.
|
||||
func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandler, error) {
|
||||
// Fulu: https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#messages
|
||||
// Fulu: https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#messages
|
||||
if forkIndex >= version.Fulu {
|
||||
return map[string]rpcHandler{
|
||||
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
|
||||
@@ -54,7 +55,7 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Electra: https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/p2p-interface.md#messages
|
||||
// Electra: https://github.com/ethereum/consensus-specs/blob/master/specs/electra/p2p-interface.md#messages
|
||||
if forkIndex >= version.Electra {
|
||||
return map[string]rpcHandler{
|
||||
p2p.RPCStatusTopicV1: s.statusRPCHandler,
|
||||
@@ -68,7 +69,7 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Deneb: https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#messages
|
||||
// Deneb: https://github.com/ethereum/consensus-specs/blob/master/specs/deneb/p2p-interface.md#messages
|
||||
if forkIndex >= version.Deneb {
|
||||
return map[string]rpcHandler{
|
||||
p2p.RPCStatusTopicV1: s.statusRPCHandler,
|
||||
@@ -82,9 +83,9 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Capella: https://github.com/ethereum/consensus-specs/blob/dev/specs/capella/p2p-interface.md#messages
|
||||
// Bellatrix: https://github.com/ethereum/consensus-specs/blob/dev/specs/bellatrix/p2p-interface.md#messages
|
||||
// Altair: https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/p2p-interface.md#messages
|
||||
// Capella: https://github.com/ethereum/consensus-specs/blob/master/specs/capella/p2p-interface.md#messages
|
||||
// Bellatrix: https://github.com/ethereum/consensus-specs/blob/master/specs/bellatrix/p2p-interface.md#messages
|
||||
// Altair: https://github.com/ethereum/consensus-specs/blob/master/specs/altair/p2p-interface.md#messages
|
||||
if forkIndex >= version.Altair {
|
||||
handler := map[string]rpcHandler{
|
||||
p2p.RPCStatusTopicV1: s.statusRPCHandler,
|
||||
@@ -105,7 +106,7 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
|
||||
return handler, nil
|
||||
}
|
||||
|
||||
// PhaseO: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#messages
|
||||
// PhaseO: https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#messages
|
||||
if forkIndex >= version.Phase0 {
|
||||
return map[string]rpcHandler{
|
||||
p2p.RPCStatusTopicV1: s.statusRPCHandler,
|
||||
@@ -238,7 +239,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
|
||||
defer span.End()
|
||||
span.SetAttributes(trace.StringAttribute("topic", topic))
|
||||
span.SetAttributes(trace.StringAttribute("peer", remotePeer.String()))
|
||||
log := log.WithField("peer", stream.Conn().RemotePeer().String()).WithField("topic", string(stream.Protocol()))
|
||||
log := log.WithFields(logrus.Fields{"peer": remotePeer.String(), "topic": string(stream.Protocol())})
|
||||
|
||||
// Check before hand that peer is valid.
|
||||
if err := s.cfg.p2p.Peers().IsBad(remotePeer); err != nil {
|
||||
@@ -248,7 +249,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
|
||||
return
|
||||
}
|
||||
// Validate request according to peer limits.
|
||||
if err := s.rateLimiter.validateRawRpcRequest(stream); err != nil {
|
||||
if err := s.rateLimiter.validateRawRpcRequest(stream, 1); err != nil {
|
||||
log.WithError(err).Debug("Could not validate rpc request from peer")
|
||||
return
|
||||
}
|
||||
@@ -304,7 +305,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
|
||||
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
||||
logStreamErrors(err, topic)
|
||||
tracing.AnnotateError(span, err)
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
s.downscorePeer(remotePeer, "registerRpcError")
|
||||
return
|
||||
}
|
||||
if err := handle(ctx, msg, stream); err != nil {
|
||||
@@ -324,7 +325,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
|
||||
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
||||
logStreamErrors(err, topic)
|
||||
tracing.AnnotateError(span, err)
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
s.downscorePeer(remotePeer, "registerRpcError")
|
||||
return
|
||||
}
|
||||
if err := handle(ctx, nTyp.Elem().Interface(), stream); err != nil {
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -43,7 +44,7 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
|
||||
rp, err := validateRangeRequest(m, s.cfg.clock.CurrentSlot())
|
||||
if err != nil {
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
|
||||
s.downscorePeer(remotePeer, "beaconBlocksByRangeRPCHandlerValidationError")
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
@@ -201,3 +202,13 @@ func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch,
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) downscorePeer(peerID peer.ID, reason string, fields ...logrus.Fields) {
|
||||
log := log
|
||||
for _, field := range fields {
|
||||
log = log.WithFields(field)
|
||||
}
|
||||
|
||||
newScore := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
|
||||
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
|
||||
}
|
||||
|
||||
@@ -92,9 +92,11 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
|
||||
return errors.New("no block roots provided")
|
||||
}
|
||||
|
||||
remotePeer := stream.Conn().RemotePeer()
|
||||
|
||||
currentEpoch := slots.ToEpoch(s.cfg.clock.CurrentSlot())
|
||||
if uint64(len(blockRoots)) > params.MaxRequestBlock(currentEpoch) {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
s.downscorePeer(remotePeer, "beaconBlocksRootRPCHandlerTooManyRoots")
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, "requested more than the max block limit", stream)
|
||||
return errors.New("requested more than the max block limit")
|
||||
}
|
||||
|
||||
@@ -74,10 +74,13 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
|
||||
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
remotePeer := stream.Conn().RemotePeer()
|
||||
|
||||
rp, err := validateBlobsByRange(r, s.cfg.chain.CurrentSlot())
|
||||
if err != nil {
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
s.downscorePeer(remotePeer, "blobSidecarsByRangeRpcHandlerValidationError")
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
@@ -87,7 +90,7 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
|
||||
defer ticker.Stop()
|
||||
batcher, err := newBlockRangeBatcher(rp, s.cfg.beaconDB, s.rateLimiter, s.cfg.chain.IsCanonical, ticker)
|
||||
if err != nil {
|
||||
log.WithError(err).Info("error in BlobSidecarsByRange batch")
|
||||
log.WithError(err).Error("Cannot create new block range batcher")
|
||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
@@ -112,7 +115,7 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
|
||||
}
|
||||
}
|
||||
if err := batch.error(); err != nil {
|
||||
log.WithError(err).Debug("error in BlobSidecarsByRange batch")
|
||||
log.WithError(err).Debug("Error in BlobSidecarsByRange batch")
|
||||
|
||||
// If a rate limit is hit, it means an error response has already been sent and the stream has been closed.
|
||||
if !errors.Is(err, p2ptypes.ErrRateLimited) {
|
||||
|
||||
@@ -39,7 +39,7 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
|
||||
cs := s.cfg.clock.CurrentSlot()
|
||||
remotePeer := stream.Conn().RemotePeer()
|
||||
if err := validateBlobByRootRequest(blobIdents, cs); err != nil {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
|
||||
s.downscorePeer(remotePeer, "blobSidecarsByRootRpcHandlerValidationError")
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i
|
||||
rangeParameters, err := validateDataColumnsByRange(request, s.cfg.chain.CurrentSlot())
|
||||
if err != nil {
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
|
||||
s.downscorePeer(remotePeer, "dataColumnSidecarsByRangeRpcHandlerValidationError")
|
||||
tracing.AnnotateError(span, err)
|
||||
return errors.Wrap(err, "validate data columns by range")
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ var (
|
||||
)
|
||||
|
||||
// dataColumnSidecarByRootRPCHandler handles the data column sidecars by root RPC request.
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
|
||||
func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
|
||||
ctx, span := trace.StartSpan(ctx, "sync.dataColumnSidecarByRootRPCHandler")
|
||||
defer span.End()
|
||||
@@ -42,7 +42,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
|
||||
}
|
||||
|
||||
requestedColumnIdents := *ref
|
||||
remotePeerId := stream.Conn().RemotePeer()
|
||||
remotePeer := stream.Conn().RemotePeer()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
|
||||
defer cancel()
|
||||
@@ -51,7 +51,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
|
||||
|
||||
// Penalize peers that send invalid requests.
|
||||
if err := validateDataColumnsByRootRequest(requestedColumnIdents); err != nil {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeerId)
|
||||
s.downscorePeer(remotePeer, "dataColumnSidecarByRootRPCHandlerValidationError")
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
|
||||
return errors.Wrap(err, "validate data columns by root request")
|
||||
}
|
||||
@@ -85,7 +85,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
|
||||
}
|
||||
|
||||
log := log.WithFields(logrus.Fields{
|
||||
"peer": remotePeerId,
|
||||
"peer": remotePeer,
|
||||
"columns": requestedColumnsByRootLog,
|
||||
})
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -35,22 +36,40 @@ var backOffTime = map[primitives.SSZUint64]time.Duration{
|
||||
|
||||
// goodbyeRPCHandler reads the incoming goodbye rpc message from the peer.
|
||||
func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream libp2pcore.Stream) error {
|
||||
const amount = 1
|
||||
SetRPCStreamDeadlines(stream)
|
||||
peerID := stream.Conn().RemotePeer()
|
||||
|
||||
m, ok := msg.(*primitives.SSZUint64)
|
||||
if !ok {
|
||||
return fmt.Errorf("wrong message type for goodbye, got %T, wanted *uint64", msg)
|
||||
}
|
||||
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
|
||||
log.WithError(err).Debug("Goodbye message from rate-limited peer")
|
||||
} else {
|
||||
|
||||
isRateLimitedPeer := false
|
||||
if err := s.rateLimiter.validateRequest(stream, amount); err != nil {
|
||||
if !errors.Is(err, p2ptypes.ErrRateLimited) {
|
||||
return errors.Wrap(err, "validate request")
|
||||
}
|
||||
isRateLimitedPeer = true
|
||||
}
|
||||
|
||||
if !isRateLimitedPeer {
|
||||
s.rateLimiter.add(stream, 1)
|
||||
}
|
||||
log := log.WithField("Reason", goodbyeMessage(*m))
|
||||
log.WithField("peer", stream.Conn().RemotePeer()).Trace("Peer has sent a goodbye message")
|
||||
s.cfg.p2p.Peers().SetNextValidTime(stream.Conn().RemotePeer(), goodByeBackoff(*m))
|
||||
// closes all streams with the peer
|
||||
return s.cfg.p2p.Disconnect(stream.Conn().RemotePeer())
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"peer": peerID,
|
||||
"reason": goodbyeMessage(*m),
|
||||
"isRateLimited": isRateLimitedPeer,
|
||||
}).Debug("Received a goodbye message")
|
||||
|
||||
s.cfg.p2p.Peers().SetNextValidTime(peerID, goodByeBackoff(*m))
|
||||
|
||||
if err := s.cfg.p2p.Disconnect(peerID); err != nil {
|
||||
return errors.Wrap(err, "disconnect")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// disconnectBadPeer checks whether peer is considered bad by some scorer, and tries to disconnect
|
||||
@@ -70,7 +89,7 @@ func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID, badPeerErr
|
||||
"peerID": id,
|
||||
"agent": agentString(id, s.cfg.p2p.Host()),
|
||||
}).
|
||||
Debug("Sent peer disconnection")
|
||||
Debug("Sent bad peer disconnection")
|
||||
}
|
||||
|
||||
// A custom goodbye method that is used by our connection handler, in the
|
||||
|
||||
@@ -26,7 +26,7 @@ func (s *Service) lightClientBootstrapRPCHandler(ctx context.Context, msg interf
|
||||
|
||||
SetRPCStreamDeadlines(stream)
|
||||
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
|
||||
logger.WithError(err).Error("s.rateLimiter.validateRequest")
|
||||
logger.WithError(err).Error("Cannot validate request")
|
||||
return err
|
||||
}
|
||||
s.rateLimiter.add(stream, 1)
|
||||
@@ -42,7 +42,7 @@ func (s *Service) lightClientBootstrapRPCHandler(ctx context.Context, msg interf
|
||||
if err != nil {
|
||||
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
|
||||
tracing.AnnotateError(span, err)
|
||||
logger.WithError(err).Error("s.cfg.beaconDB.LightClientBootstrap")
|
||||
logger.WithError(err).Error("Cannot bootstrap light client")
|
||||
return err
|
||||
}
|
||||
if bootstrap == nil {
|
||||
@@ -74,10 +74,11 @@ func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg i
|
||||
defer cancel()
|
||||
|
||||
logger := log.WithField("handler", p2p.LightClientUpdatesByRangeName[1:])
|
||||
remotePeer := stream.Conn().RemotePeer()
|
||||
|
||||
SetRPCStreamDeadlines(stream)
|
||||
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
|
||||
logger.WithError(err).Error("s.rateLimiter.validateRequest")
|
||||
logger.WithError(err).Error("Cannot validate request")
|
||||
return err
|
||||
}
|
||||
s.rateLimiter.add(stream, 1)
|
||||
@@ -90,7 +91,8 @@ func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg i
|
||||
|
||||
if r.Count == 0 {
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, "count is 0", stream)
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
s.downscorePeer(remotePeer, "lightClientUpdatesByRangeRPCHandlerCount0")
|
||||
|
||||
logger.Error("Count is 0")
|
||||
return nil
|
||||
}
|
||||
@@ -102,7 +104,7 @@ func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg i
|
||||
endPeriod, err := math.Add64(r.StartPeriod, r.Count-1)
|
||||
if err != nil {
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
s.downscorePeer(remotePeer, "lightClientUpdatesByRangeRPCHandlerEndPeriodOverflow")
|
||||
tracing.AnnotateError(span, err)
|
||||
logger.WithError(err).Error("End period overflows")
|
||||
return err
|
||||
@@ -114,7 +116,7 @@ func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg i
|
||||
if err != nil {
|
||||
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
|
||||
tracing.AnnotateError(span, err)
|
||||
logger.WithError(err).Error("s.cfg.beaconDB.LightClientUpdates")
|
||||
logger.WithError(err).Error("Cannot retrieve light client updates")
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -153,7 +155,7 @@ func (s *Service) lightClientFinalityUpdateRPCHandler(ctx context.Context, _ int
|
||||
|
||||
SetRPCStreamDeadlines(stream)
|
||||
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
|
||||
logger.WithError(err).Error("s.rateLimiter.validateRequest")
|
||||
logger.WithError(err).Error("Cannot validate request")
|
||||
return err
|
||||
}
|
||||
s.rateLimiter.add(stream, 1)
|
||||
@@ -191,7 +193,7 @@ func (s *Service) lightClientOptimisticUpdateRPCHandler(ctx context.Context, _ i
|
||||
|
||||
SetRPCStreamDeadlines(stream)
|
||||
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
|
||||
logger.WithError(err).Error("s.rateLimiter.validateRequest")
|
||||
logger.WithError(err).Error("Cannot validate request")
|
||||
return err
|
||||
}
|
||||
s.rateLimiter.add(stream, 1)
|
||||
|
||||
@@ -172,12 +172,12 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta
|
||||
// Read the METADATA response from the peer.
|
||||
code, errMsg, err := ReadStatusCode(stream, s.cfg.p2p.Encoding())
|
||||
if err != nil {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
|
||||
s.downscorePeer(peerID, "MetadataReadStatusCodeError")
|
||||
return nil, errors.Wrap(err, "read status code")
|
||||
}
|
||||
|
||||
if code != 0 {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
|
||||
s.downscorePeer(peerID, "NonNullMetadataReadStatusCode")
|
||||
return nil, errors.New(errMsg)
|
||||
}
|
||||
|
||||
@@ -214,8 +214,8 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta
|
||||
|
||||
// Decode the metadata from the peer.
|
||||
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
return nil, err
|
||||
s.downscorePeer(peerID, "MetadataDecodeError")
|
||||
return nil, errors.Wrap(err, "decode with max length")
|
||||
}
|
||||
|
||||
return msg, nil
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// pingHandler reads the incoming ping rpc message from the peer.
|
||||
@@ -27,6 +28,7 @@ func (s *Service) pingHandler(_ context.Context, msg interface{}, stream libp2pc
|
||||
if !ok {
|
||||
return fmt.Errorf("wrong message type for ping, got %T, wanted *uint64", msg)
|
||||
}
|
||||
sequenceNumber := uint64(*m)
|
||||
|
||||
// Validate the incoming request regarding rate limiting.
|
||||
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
|
||||
@@ -39,14 +41,9 @@ func (s *Service) pingHandler(_ context.Context, msg interface{}, stream libp2pc
|
||||
peerID := stream.Conn().RemotePeer()
|
||||
|
||||
// Check if the peer sequence number is higher than the one we have in our store.
|
||||
valid, err := s.validateSequenceNum(*m, peerID)
|
||||
valid, err := s.isSequenceNumberUpToDate(sequenceNumber, peerID)
|
||||
if err != nil {
|
||||
// Descore peer for giving us a bad sequence number.
|
||||
if errors.Is(err, p2ptypes.ErrInvalidSequenceNum) {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidSequenceNum.Error(), stream)
|
||||
}
|
||||
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidSequenceNum.Error(), stream)
|
||||
return errors.Wrap(err, "validate sequence number")
|
||||
}
|
||||
|
||||
@@ -141,7 +138,7 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
|
||||
|
||||
// If the peer responded with an error, increment the bad responses scorer.
|
||||
if code != 0 {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
|
||||
s.downscorePeer(peerID, "NotNullPingReadStatusCode")
|
||||
return errors.Errorf("code: %d - %s", code, errMsg)
|
||||
}
|
||||
|
||||
@@ -150,15 +147,11 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
|
||||
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
||||
return errors.Wrap(err, "decode sequence number")
|
||||
}
|
||||
sequenceNumber := uint64(*msg)
|
||||
|
||||
// Determine if the peer's sequence number returned by the peer is higher than the one we have in our store.
|
||||
valid, err := s.validateSequenceNum(*msg, peerID)
|
||||
valid, err := s.isSequenceNumberUpToDate(sequenceNumber, peerID)
|
||||
if err != nil {
|
||||
// Descore peer for giving us a bad sequence number.
|
||||
if errors.Is(err, p2ptypes.ErrInvalidSequenceNum) {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
|
||||
}
|
||||
|
||||
return errors.Wrap(err, "validate sequence number")
|
||||
}
|
||||
|
||||
@@ -180,27 +173,36 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateSequenceNum validates the peer's sequence number.
|
||||
// - If the peer's sequence number is greater than the sequence number we have in our store for the peer, return false.
|
||||
// - If the peer's sequence number is equal to the sequence number we have in our store for the peer, return true.
|
||||
// - If the peer's sequence number is less than the sequence number we have in our store for the peer, return an error.
|
||||
func (s *Service) validateSequenceNum(seq primitives.SSZUint64, id peer.ID) (bool, error) {
|
||||
// isSequenceNumberUpToDate check if our internal sequence number for the peer is up to date wrt. the incoming one.
|
||||
// - If the incoming sequence number is greater than the sequence number we have in our store for the peer, return false.
|
||||
// - If the incoming sequence number is equal to the sequence number we have in our store for the peer, return true.
|
||||
// - If the incoming sequence number is less than the sequence number we have in our store for the peer, return an error.
|
||||
func (s *Service) isSequenceNumberUpToDate(incomingSequenceNumber uint64, peerID peer.ID) (bool, error) {
|
||||
// Retrieve the metadata for the peer we got in our store.
|
||||
md, err := s.cfg.p2p.Peers().Metadata(id)
|
||||
storedMetadata, err := s.cfg.p2p.Peers().Metadata(peerID)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "get metadata")
|
||||
return false, errors.Wrap(err, "peers metadata")
|
||||
}
|
||||
|
||||
// If we have no metadata for the peer, return false.
|
||||
if md == nil || md.IsNil() {
|
||||
if storedMetadata == nil || storedMetadata.IsNil() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// The peer's sequence number must be less than or equal to the sequence number we have in our store.
|
||||
if md.SequenceNumber() > uint64(seq) {
|
||||
storedSequenceNumber := storedMetadata.SequenceNumber()
|
||||
if storedSequenceNumber > incomingSequenceNumber {
|
||||
s.downscorePeer(peerID, "pingInvalidSequenceNumber", logrus.Fields{
|
||||
"storedSequenceNumber": storedSequenceNumber,
|
||||
"incomingSequenceNumber": incomingSequenceNumber,
|
||||
})
|
||||
return false, p2ptypes.ErrInvalidSequenceNum
|
||||
}
|
||||
|
||||
// Return true if the peer's sequence number is equal to the sequence number we have in our store.
|
||||
return md.SequenceNumber() == uint64(seq), nil
|
||||
// If this is the case, our information about the peer is outdated.
|
||||
if storedSequenceNumber < incomingSequenceNumber {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@@ -35,6 +35,9 @@ func (s *Service) maintainPeerStatuses() {
|
||||
wg.Add(1)
|
||||
go func(id peer.ID) {
|
||||
defer wg.Done()
|
||||
|
||||
log := log.WithField("peer", id)
|
||||
|
||||
// If our peer status has not been updated correctly we disconnect over here
|
||||
// and set the connection state over here instead.
|
||||
if s.cfg.p2p.Host().Network().Connectedness(id) != network.Connected {
|
||||
@@ -43,27 +46,26 @@ func (s *Service) maintainPeerStatuses() {
|
||||
log.WithError(err).Debug("Error when disconnecting with peer")
|
||||
}
|
||||
s.cfg.p2p.Peers().SetConnectionState(id, peers.Disconnected)
|
||||
log.WithFields(logrus.Fields{
|
||||
"peer": id,
|
||||
"reason": "maintain peer statuses - peer is not connected",
|
||||
}).Debug("Initiate peer disconnection")
|
||||
log.WithField("reason", "maintainPeerStatusesNotConnectedPeer").Debug("Initiate peer disconnection")
|
||||
return
|
||||
}
|
||||
|
||||
// Disconnect from peers that are considered bad by any of the registered scorers.
|
||||
if err := s.cfg.p2p.Peers().IsBad(id); err != nil {
|
||||
s.disconnectBadPeer(s.ctx, id, err)
|
||||
return
|
||||
}
|
||||
|
||||
// If the status hasn't been updated in the recent interval time.
|
||||
lastUpdated, err := s.cfg.p2p.Peers().ChainStateLastUpdated(id)
|
||||
if err != nil {
|
||||
// Peer has vanished; nothing to do.
|
||||
return
|
||||
}
|
||||
|
||||
if prysmTime.Now().After(lastUpdated.Add(interval)) {
|
||||
if err := s.reValidatePeer(s.ctx, id); err != nil {
|
||||
log.WithField("peer", id).WithError(err).Debug("Could not revalidate peer")
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(id)
|
||||
log.WithError(err).Debug("Cannot re-validate peer")
|
||||
}
|
||||
}
|
||||
}(pid)
|
||||
@@ -128,19 +130,20 @@ func (s *Service) shouldReSync() bool {
|
||||
}
|
||||
|
||||
// sendRPCStatusRequest for a given topic with an expected protobuf message type.
|
||||
func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
|
||||
func (s *Service) sendRPCStatusRequest(ctx context.Context, peer peer.ID) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, respTimeout)
|
||||
defer cancel()
|
||||
|
||||
headRoot, err := s.cfg.chain.HeadRoot(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "head root")
|
||||
}
|
||||
|
||||
forkDigest, err := s.currentForkDigest()
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "current fork digest")
|
||||
}
|
||||
|
||||
cp := s.cfg.chain.FinalizedCheckpt()
|
||||
resp := &pb.Status{
|
||||
ForkDigest: forkDigest[:],
|
||||
@@ -149,37 +152,42 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
|
||||
HeadRoot: headRoot,
|
||||
HeadSlot: s.cfg.chain.HeadSlot(),
|
||||
}
|
||||
|
||||
log := log.WithField("peer", peer)
|
||||
|
||||
topic, err := p2p.TopicFromMessage(p2p.StatusMessageName, slots.ToEpoch(s.cfg.clock.CurrentSlot()))
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "topic from message")
|
||||
}
|
||||
stream, err := s.cfg.p2p.Send(ctx, resp, topic, id)
|
||||
|
||||
stream, err := s.cfg.p2p.Send(ctx, resp, topic, peer)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "send p2p message")
|
||||
}
|
||||
defer closeStream(stream, log)
|
||||
|
||||
code, errMsg, err := ReadStatusCode(stream, s.cfg.p2p.Encoding())
|
||||
if err != nil {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
return err
|
||||
s.downscorePeer(peer, "statusRequestReadStatusCodeError")
|
||||
return errors.Wrap(err, "read status code")
|
||||
}
|
||||
|
||||
if code != 0 {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(id)
|
||||
s.downscorePeer(peer, "statusRequestNonNullStatusCode")
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
|
||||
msg := &pb.Status{}
|
||||
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
return err
|
||||
s.downscorePeer(peer, "statusRequestDecodeError")
|
||||
return errors.Wrap(err, "decode status message")
|
||||
}
|
||||
|
||||
// If validation fails, validation error is logged, and peer status scorer will mark peer as bad.
|
||||
err = s.validateStatusMessage(ctx, msg)
|
||||
s.cfg.p2p.Peers().Scorers().PeerStatusScorer().SetPeerStatus(id, msg, err)
|
||||
if err := s.cfg.p2p.Peers().IsBad(id); err != nil {
|
||||
s.disconnectBadPeer(s.ctx, id, err)
|
||||
s.cfg.p2p.Peers().Scorers().PeerStatusScorer().SetPeerStatus(peer, msg, err)
|
||||
if err := s.cfg.p2p.Peers().IsBad(peer); err != nil {
|
||||
s.disconnectBadPeer(s.ctx, peer, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
@@ -238,7 +246,7 @@ func (s *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
|
||||
return nil
|
||||
default:
|
||||
respCode = responseCodeInvalidRequest
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
|
||||
s.downscorePeer(remotePeer, "statusRpcHandlerInvalidMessage")
|
||||
}
|
||||
|
||||
originalErr := err
|
||||
|
||||
@@ -11,10 +11,12 @@ import (
|
||||
|
||||
lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/rand"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
gcache "github.com/patrickmn/go-cache"
|
||||
"github.com/pkg/errors"
|
||||
@@ -278,19 +280,32 @@ func (s *Service) Start() {
|
||||
// Stop the regular sync service.
|
||||
func (s *Service) Stop() error {
|
||||
defer func() {
|
||||
s.cancel()
|
||||
|
||||
if s.rateLimiter != nil {
|
||||
s.rateLimiter.free()
|
||||
}
|
||||
}()
|
||||
|
||||
// Say goodbye to all peers.
|
||||
for _, peerID := range s.cfg.p2p.Peers().Connected() {
|
||||
if s.cfg.p2p.Host().Network().Connectedness(peerID) == network.Connected {
|
||||
if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeClientShutdown, peerID); err != nil {
|
||||
log.WithError(err).WithField("peerID", peerID).Error("Failed to send goodbye message")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Removing RPC Stream handlers.
|
||||
for _, p := range s.cfg.p2p.Host().Mux().Protocols() {
|
||||
s.cfg.p2p.Host().RemoveStreamHandler(p)
|
||||
}
|
||||
|
||||
// Deregister Topic Subscribers.
|
||||
for _, t := range s.cfg.p2p.PubSub().GetTopics() {
|
||||
s.unSubscribeFromTopic(t)
|
||||
}
|
||||
defer s.cancel()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
|
||||
func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
|
||||
const dataColumnSidecarSubTopic = "/data_column_sidecar_%d/"
|
||||
|
||||
@@ -74,7 +74,7 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
|
||||
verifier := s.newColumnsVerifier(roDataColumns, verification.GossipDataColumnSidecarRequirements)
|
||||
|
||||
// Start the verification process.
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
|
||||
|
||||
// [REJECT] The sidecar is valid as verified by `verify_data_column_sidecar(sidecar)`.
|
||||
if err := verifier.ValidFields(); err != nil {
|
||||
|
||||
@@ -22,7 +22,7 @@ import (
|
||||
var (
|
||||
// GossipDataColumnSidecarRequirements defines the set of requirements that DataColumnSidecars received on gossip
|
||||
// must satisfy in order to upgrade an RODataColumn to a VerifiedRODataColumn.
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#data_column_sidecar_subnet_id
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#data_column_sidecar_subnet_id
|
||||
GossipDataColumnSidecarRequirements = []Requirement{
|
||||
RequireValidFields,
|
||||
RequireCorrectSubnet,
|
||||
@@ -40,7 +40,7 @@ var (
|
||||
|
||||
// ByRangeRequestDataColumnSidecarRequirements defines the set of requirements that DataColumnSidecars received
|
||||
// via the by range request must satisfy in order to upgrade an RODataColumn to a VerifiedRODataColumn.
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
|
||||
ByRangeRequestDataColumnSidecarRequirements = []Requirement{
|
||||
RequireValidFields,
|
||||
RequireSidecarInclusionProven,
|
||||
|
||||
7
changelog/james-prysm_remove-ssz-only-flag.md
Normal file
7
changelog/james-prysm_remove-ssz-only-flag.md
Normal file
@@ -0,0 +1,7 @@
|
||||
### Removed
|
||||
|
||||
- Partially reverting pr #15390 removing the `ssz-only` debug flag until there is a real usecase for the flag
|
||||
|
||||
### Added
|
||||
|
||||
- Added new PRYSM_API_OVERRIDE_ACCEPT environment variable to override ssz accept header as a replacement to flag
|
||||
3
changelog/jtraglia_dev-to-master.md
Normal file
3
changelog/jtraglia_dev-to-master.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Changed
|
||||
|
||||
- Update links to consensus-specs to point to `master` branch
|
||||
2
changelog/manu-peer-ban-at-restart.md
Normal file
2
changelog/manu-peer-ban-at-restart.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Fixed
|
||||
- Fixed various reasons why a node is banned by its peers when it stops.
|
||||
3
changelog/tt_duty.md
Normal file
3
changelog/tt_duty.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Fixed
|
||||
|
||||
- Beacon-api proposer duty fulu computation
|
||||
@@ -51,7 +51,6 @@ type Flags struct {
|
||||
EnableExperimentalAttestationPool bool // EnableExperimentalAttestationPool enables an experimental attestation pool design.
|
||||
DisableDutiesV2 bool // DisableDutiesV2 sets validator client to use the get Duties endpoint
|
||||
EnableWeb bool // EnableWeb enables the webui on the validator client
|
||||
SSZOnly bool // SSZOnly forces the validator client to use SSZ for communication with the beacon node when REST mode is enabled (useful for debugging)
|
||||
// Logging related toggles.
|
||||
DisableGRPCConnectionLogs bool // Disables logging when a new grpc client has connected.
|
||||
EnableFullSSZDataLogging bool // Enables logging for full ssz data on rejected gossip messages
|
||||
@@ -340,10 +339,6 @@ func ConfigureValidator(ctx *cli.Context) error {
|
||||
logEnabled(EnableWebFlag)
|
||||
cfg.EnableWeb = true
|
||||
}
|
||||
if ctx.Bool(SSZOnly.Name) {
|
||||
logEnabled(SSZOnly)
|
||||
cfg.SSZOnly = true
|
||||
}
|
||||
|
||||
cfg.KeystoreImportDebounceInterval = ctx.Duration(dynamicKeyReloadDebounceInterval.Name)
|
||||
Init(cfg)
|
||||
|
||||
@@ -197,12 +197,6 @@ var (
|
||||
Usage: "(Work in progress): Enables the web portal for the validator client.",
|
||||
Value: false,
|
||||
}
|
||||
|
||||
// SSZOnly forces the validator client to use SSZ for communication with the beacon node when REST mode is enabled
|
||||
SSZOnly = &cli.BoolFlag{
|
||||
Name: "ssz-only",
|
||||
Usage: "(debug): Forces the validator client to use SSZ for communication with the beacon node when REST mode is enabled",
|
||||
}
|
||||
)
|
||||
|
||||
// devModeFlags holds list of flags that are set when development mode is on.
|
||||
@@ -225,7 +219,6 @@ var ValidatorFlags = append(deprecatedFlags, []cli.Flag{
|
||||
EnableBeaconRESTApi,
|
||||
DisableDutiesV2,
|
||||
EnableWebFlag,
|
||||
SSZOnly,
|
||||
}...)
|
||||
|
||||
// E2EValidatorFlags contains a list of the validator feature flags to be tested in E2E.
|
||||
|
||||
@@ -4,6 +4,10 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
const (
|
||||
EnvNameOverrideAccept = "PRYSM_API_OVERRIDE_ACCEPT"
|
||||
)
|
||||
|
||||
// SetupTestConfigCleanup preserves configurations allowing to modify them within tests without any
|
||||
// restrictions, everything is restored after the test.
|
||||
func SetupTestConfigCleanup(t testing.TB) {
|
||||
|
||||
@@ -27,7 +27,7 @@ type VersionedUnmarshaler struct {
|
||||
// Fork aligns with the fork names in config/params/values.go
|
||||
Fork int
|
||||
// Version corresponds to the Version type defined in the beacon-chain spec, aka a "fork version number":
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#custom-types
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#custom-types
|
||||
Version [fieldparams.VersionLength]byte
|
||||
}
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ message BeaconBlockBody {
|
||||
|
||||
// Block operations
|
||||
// Refer to spec constants at
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
|
||||
|
||||
// At most MAX_PROPOSER_SLASHINGS.
|
||||
repeated ProposerSlashing proposer_slashings = 4
|
||||
|
||||
@@ -171,7 +171,7 @@ message BeaconBlockBody {
|
||||
|
||||
// Block operations
|
||||
// Refer to spec constants at
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
|
||||
|
||||
// At most MAX_PROPOSER_SLASHINGS.
|
||||
repeated ProposerSlashing proposer_slashings = 4
|
||||
@@ -402,7 +402,7 @@ message BeaconBlockBodyAltair {
|
||||
|
||||
// Block operations
|
||||
// Refer to spec constants at
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
|
||||
|
||||
// At most MAX_PROPOSER_SLASHINGS.
|
||||
repeated ProposerSlashing proposer_slashings = 4
|
||||
@@ -487,7 +487,7 @@ message BeaconBlockBodyBellatrix {
|
||||
|
||||
// Block operations
|
||||
// Refer to spec constants at
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
|
||||
|
||||
// At most MAX_PROPOSER_SLASHINGS.
|
||||
repeated ProposerSlashing proposer_slashings = 4
|
||||
@@ -629,7 +629,7 @@ message BeaconBlockBodyCapella {
|
||||
|
||||
// Block operations
|
||||
// Refer to spec constants at
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#max-operations-per-block
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#max-operations-per-block
|
||||
|
||||
// At most MAX_PROPOSER_SLASHINGS.
|
||||
repeated ProposerSlashing proposer_slashings = 4
|
||||
@@ -813,7 +813,7 @@ message BeaconBlockBodyDeneb {
|
||||
|
||||
// Block operations
|
||||
// Refer to spec constants at
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/core/0_beacon-chain.md#max-operations-per-block
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/core/0_beacon-chain.md#max-operations-per-block
|
||||
|
||||
// At most MAX_PROPOSER_SLASHINGS.
|
||||
repeated ProposerSlashing proposer_slashings = 4
|
||||
@@ -1040,7 +1040,7 @@ message BeaconBlockBodyElectra {
|
||||
|
||||
// Block operations
|
||||
// Refer to spec constants at
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/core/0_beacon-chain.md#max-operations-per-block
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/core/0_beacon-chain.md#max-operations-per-block
|
||||
|
||||
// At most MAX_PROPOSER_SLASHINGS.
|
||||
repeated ProposerSlashing proposer_slashings = 4
|
||||
|
||||
@@ -100,7 +100,7 @@ service BeaconChain {
|
||||
// that it was included in a block. The attestation may have expired.
|
||||
// Refer to the Ethereum Beacon Chain specification for more details on how
|
||||
// attestations are processed and when they are no longer valid.
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#attestations
|
||||
rpc AttestationPool(AttestationPoolRequest)
|
||||
returns (AttestationPoolResponse) {
|
||||
option deprecated = true;
|
||||
@@ -117,7 +117,7 @@ service BeaconChain {
|
||||
// that it was included in a block. The attestation may have expired.
|
||||
// Refer to the Ethereum Beacon Chain specification for more details on how
|
||||
// attestations are processed and when they are no longer valid.
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#attestations
|
||||
rpc AttestationPoolElectra(AttestationPoolRequest)
|
||||
returns (AttestationPoolElectraResponse) {
|
||||
option deprecated = true;
|
||||
|
||||
@@ -191,7 +191,7 @@ message PowBlock {
|
||||
|
||||
// The beacon state for Altair hard fork 1.
|
||||
// Reference:
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/beacon-chain.md#beaconstate
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/altair/beacon-chain.md#beaconstate
|
||||
message BeaconStateAltair {
|
||||
// Versioning [1001-2000]
|
||||
uint64 genesis_time = 1001;
|
||||
@@ -747,4 +747,4 @@ message BeaconStateFulu {
|
||||
// Fields introduced in Fulu fork [13001-14000]
|
||||
repeated uint64 proposer_lookahead = 13001
|
||||
[ (ethereum.eth.ext.ssz_size) = "proposer_lookahead_size" ];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -248,9 +248,6 @@ func (v *ValidatorNode) Start(ctx context.Context) error {
|
||||
args = append(args,
|
||||
fmt.Sprintf("--%s=http://localhost:%d", flags.BeaconRESTApiProviderFlag.Name, beaconRestApiPort),
|
||||
fmt.Sprintf("--%s", features.EnableBeaconRESTApi.Name))
|
||||
if v.config.UseSSZOnly {
|
||||
args = append(args, fmt.Sprintf("--%s", features.SSZOnly.Name))
|
||||
}
|
||||
}
|
||||
|
||||
// Only apply e2e flags to the current branch. New flags may not exist in previous release.
|
||||
|
||||
@@ -25,14 +25,14 @@ func TestEndToEnd_MinimalConfig_Web3Signer_PersistentKeys(t *testing.T) {
|
||||
e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithRemoteSignerAndPersistentKeysFile()).run()
|
||||
}
|
||||
|
||||
func TestEndToEnd_MinimalConfig_ValidatorRESTApi(t *testing.T) {
|
||||
e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithValidatorRESTApi()).run()
|
||||
}
|
||||
|
||||
func TestEndToEnd_MinimalConfig_ValidatorRESTApi_SSZ(t *testing.T) {
|
||||
e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithValidatorRESTApi(), types.WithSSZOnly()).run()
|
||||
}
|
||||
|
||||
func TestEndToEnd_MinimalConfig_ValidatorRESTApi(t *testing.T) {
|
||||
e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithValidatorRESTApi()).run()
|
||||
}
|
||||
|
||||
func TestEndToEnd_ScenarioRun_EEOffline(t *testing.T) {
|
||||
t.Skip("TODO(#10242) Prysm is current unable to handle an offline e2e")
|
||||
cfg := types.InitForkCfg(version.Bellatrix, version.Deneb, params.E2ETestConfig())
|
||||
|
||||
@@ -11,9 +11,11 @@ go_library(
|
||||
importpath = "github.com/OffchainLabs/prysm/v6/testing/endtoend/types",
|
||||
visibility = ["//testing/endtoend:__subpackages__"],
|
||||
deps = [
|
||||
"//api:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//runtime/version:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@org_golang_google_grpc//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -6,9 +6,11 @@ import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/api"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/runtime/version"
|
||||
"github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
@@ -51,18 +53,20 @@ func WithValidatorRESTApi() E2EConfigOpt {
|
||||
}
|
||||
}
|
||||
|
||||
func WithSSZOnly() E2EConfigOpt {
|
||||
return func(cfg *E2EConfig) {
|
||||
cfg.UseSSZOnly = true
|
||||
}
|
||||
}
|
||||
|
||||
func WithBuilder() E2EConfigOpt {
|
||||
return func(cfg *E2EConfig) {
|
||||
cfg.UseBuilder = true
|
||||
}
|
||||
}
|
||||
|
||||
func WithSSZOnly() E2EConfigOpt {
|
||||
return func(cfg *E2EConfig) {
|
||||
if err := os.Setenv(params.EnvNameOverrideAccept, api.OctetStreamMediaType); err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// E2EConfig defines the struct for all configurations needed for E2E testing.
|
||||
type E2EConfig struct {
|
||||
TestCheckpointSync bool
|
||||
@@ -76,7 +80,6 @@ type E2EConfig struct {
|
||||
UseFixedPeerIDs bool
|
||||
UseValidatorCrossClient bool
|
||||
UseBeaconRestApi bool
|
||||
UseSSZOnly bool
|
||||
UseBuilder bool
|
||||
EpochsToRun uint64
|
||||
Seed int64
|
||||
|
||||
@@ -46,7 +46,6 @@ go_library(
|
||||
"//api/server/structs:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/signing:go_default_library",
|
||||
"//config/features:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
@@ -128,7 +127,6 @@ go_test(
|
||||
"//api/server/structs:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/rpc/eth/shared/testing:go_default_library",
|
||||
"//config/features:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//consensus-types/validator:go_default_library",
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/api"
|
||||
"github.com/OffchainLabs/prysm/v6/api/server/structs"
|
||||
"github.com/OffchainLabs/prysm/v6/config/features"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/assert"
|
||||
@@ -215,11 +214,6 @@ func TestGetBeaconBlock_SSZ_BellatrixValid(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
proto := testhelpers.GenerateProtoBellatrixBeaconBlock()
|
||||
bytes, err := proto.MarshalSSZ()
|
||||
require.NoError(t, err)
|
||||
@@ -262,11 +256,6 @@ func TestGetBeaconBlock_SSZ_BlindedBellatrixValid(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
proto := testhelpers.GenerateProtoBlindedBellatrixBeaconBlock()
|
||||
bytes, err := proto.MarshalSSZ()
|
||||
require.NoError(t, err)
|
||||
@@ -309,11 +298,6 @@ func TestGetBeaconBlock_SSZ_CapellaValid(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
proto := testhelpers.GenerateProtoCapellaBeaconBlock()
|
||||
bytes, err := proto.MarshalSSZ()
|
||||
require.NoError(t, err)
|
||||
@@ -356,11 +340,6 @@ func TestGetBeaconBlock_SSZ_BlindedCapellaValid(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
proto := testhelpers.GenerateProtoBlindedCapellaBeaconBlock()
|
||||
bytes, err := proto.MarshalSSZ()
|
||||
require.NoError(t, err)
|
||||
@@ -403,11 +382,6 @@ func TestGetBeaconBlock_SSZ_DenebValid(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
proto := testhelpers.GenerateProtoDenebBeaconBlockContents()
|
||||
bytes, err := proto.MarshalSSZ()
|
||||
require.NoError(t, err)
|
||||
@@ -450,11 +424,6 @@ func TestGetBeaconBlock_SSZ_BlindedDenebValid(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
proto := testhelpers.GenerateProtoBlindedDenebBeaconBlock()
|
||||
bytes, err := proto.MarshalSSZ()
|
||||
require.NoError(t, err)
|
||||
@@ -497,11 +466,6 @@ func TestGetBeaconBlock_SSZ_ElectraValid(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
proto := testhelpers.GenerateProtoElectraBeaconBlockContents()
|
||||
bytes, err := proto.MarshalSSZ()
|
||||
require.NoError(t, err)
|
||||
@@ -544,11 +508,6 @@ func TestGetBeaconBlock_SSZ_BlindedElectraValid(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
proto := testhelpers.GenerateProtoBlindedElectraBeaconBlock()
|
||||
bytes, err := proto.MarshalSSZ()
|
||||
require.NoError(t, err)
|
||||
@@ -591,11 +550,6 @@ func TestGetBeaconBlock_SSZ_UnsupportedVersion(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
const slot = primitives.Slot(1)
|
||||
randaoReveal := []byte{2}
|
||||
graffiti := []byte{3}
|
||||
@@ -625,11 +579,6 @@ func TestGetBeaconBlock_SSZ_InvalidBlindedHeader(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
proto := testhelpers.GenerateProtoBellatrixBeaconBlock()
|
||||
bytes, err := proto.MarshalSSZ()
|
||||
require.NoError(t, err)
|
||||
@@ -663,11 +612,6 @@ func TestGetBeaconBlock_SSZ_InvalidVersionHeader(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
proto := testhelpers.GenerateProtoBellatrixBeaconBlock()
|
||||
bytes, err := proto.MarshalSSZ()
|
||||
require.NoError(t, err)
|
||||
@@ -701,11 +645,6 @@ func TestGetBeaconBlock_SSZ_GetSSZError(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
const slot = primitives.Slot(1)
|
||||
randaoReveal := []byte{2}
|
||||
graffiti := []byte{3}
|
||||
@@ -731,11 +670,6 @@ func TestGetBeaconBlock_SSZ_Phase0Valid(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
proto := testhelpers.GenerateProtoPhase0BeaconBlock()
|
||||
bytes, err := proto.MarshalSSZ()
|
||||
require.NoError(t, err)
|
||||
@@ -778,11 +712,6 @@ func TestGetBeaconBlock_SSZ_AltairValid(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
SSZOnly: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
proto := testhelpers.GenerateProtoAltairBeaconBlock()
|
||||
bytes, err := proto.MarshalSSZ()
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -7,15 +7,19 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/api"
|
||||
"github.com/OffchainLabs/prysm/v6/config/features"
|
||||
"github.com/OffchainLabs/prysm/v6/api/apiutil"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/network/httputil"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type reqOption func(*http.Request)
|
||||
|
||||
type RestHandler interface {
|
||||
Get(ctx context.Context, endpoint string, resp interface{}) error
|
||||
GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error)
|
||||
@@ -26,16 +30,30 @@ type RestHandler interface {
|
||||
}
|
||||
|
||||
type BeaconApiRestHandler struct {
|
||||
client http.Client
|
||||
host string
|
||||
client http.Client
|
||||
host string
|
||||
reqOverrides []reqOption
|
||||
}
|
||||
|
||||
// NewBeaconApiRestHandler returns a RestHandler
|
||||
func NewBeaconApiRestHandler(client http.Client, host string) RestHandler {
|
||||
return &BeaconApiRestHandler{
|
||||
brh := &BeaconApiRestHandler{
|
||||
client: client,
|
||||
host: host,
|
||||
}
|
||||
brh.appendAcceptOverride()
|
||||
return brh
|
||||
}
|
||||
|
||||
// appendAcceptOverride enables the Accept header to be customized at runtime via an environment variable.
|
||||
// This is specified as an env var because it is a niche option that prysm may use for performance testing or debugging
|
||||
// bug which users are unlikely to need. Using an env var keeps the set of user-facing flags cleaner.
|
||||
func (c *BeaconApiRestHandler) appendAcceptOverride() {
|
||||
if accept := os.Getenv(params.EnvNameOverrideAccept); accept != "" {
|
||||
c.reqOverrides = append(c.reqOverrides, func(req *http.Request) {
|
||||
req.Header.Set("Accept", accept)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// HttpClient returns the underlying HTTP client of the handler
|
||||
@@ -56,7 +74,6 @@ func (c *BeaconApiRestHandler) Get(ctx context.Context, endpoint string, resp in
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to create request for endpoint %s", url)
|
||||
}
|
||||
|
||||
httpResp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to perform request for endpoint %s", url)
|
||||
@@ -76,13 +93,16 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrapf(err, "failed to create request for endpoint %s", url)
|
||||
}
|
||||
|
||||
primaryAcceptType := fmt.Sprintf("%s;q=%s", api.OctetStreamMediaType, "0.95")
|
||||
secondaryAcceptType := fmt.Sprintf("%s;q=%s", api.JsonMediaType, "0.9")
|
||||
acceptHeaderString := fmt.Sprintf("%s,%s", primaryAcceptType, secondaryAcceptType)
|
||||
if features.Get().SSZOnly {
|
||||
acceptHeaderString = api.OctetStreamMediaType
|
||||
}
|
||||
req.Header.Set("Accept", acceptHeaderString)
|
||||
|
||||
for _, o := range c.reqOverrides {
|
||||
o(req)
|
||||
}
|
||||
|
||||
httpResp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrapf(err, "failed to perform request for endpoint %s", url)
|
||||
@@ -92,16 +112,17 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
|
||||
return
|
||||
}
|
||||
}()
|
||||
accept := req.Header.Get("Accept")
|
||||
contentType := httpResp.Header.Get("Content-Type")
|
||||
body, err := io.ReadAll(httpResp.Body)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrapf(err, "failed to read response body for %s", httpResp.Request.URL)
|
||||
}
|
||||
if !strings.Contains(primaryAcceptType, contentType) {
|
||||
|
||||
if !apiutil.PrimaryAcceptMatches(accept, contentType) {
|
||||
log.WithFields(logrus.Fields{
|
||||
"primaryAcceptType": primaryAcceptType,
|
||||
"secondaryAcceptType": secondaryAcceptType,
|
||||
"receivedAcceptType": contentType,
|
||||
"Accept": accept,
|
||||
"Content-Type": contentType,
|
||||
}).Debug("Server responded with non primary accept type")
|
||||
}
|
||||
|
||||
@@ -115,10 +136,6 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
|
||||
return nil, nil, errorJson
|
||||
}
|
||||
|
||||
if features.Get().SSZOnly && contentType != api.OctetStreamMediaType {
|
||||
return nil, nil, errors.Errorf("server responded with non primary accept type %s", contentType)
|
||||
}
|
||||
|
||||
return body, httpResp.Header, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -7,11 +7,13 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/api"
|
||||
"github.com/OffchainLabs/prysm/v6/api/server/structs"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/network/httputil"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/assert"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
@@ -143,6 +145,25 @@ func TestGetSSZ(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestAcceptOverrideSSZ(t *testing.T) {
|
||||
name := "TestAcceptOverride"
|
||||
orig := os.Getenv(params.EnvNameOverrideAccept)
|
||||
defer func() {
|
||||
require.NoError(t, os.Setenv(params.EnvNameOverrideAccept, orig))
|
||||
}()
|
||||
require.NoError(t, os.Setenv(params.EnvNameOverrideAccept, name))
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
require.Equal(t, name, r.Header.Get("Accept"))
|
||||
w.WriteHeader(200)
|
||||
_, err := w.Write([]byte("ok"))
|
||||
require.NoError(t, err)
|
||||
}))
|
||||
defer srv.Close()
|
||||
c := NewBeaconApiRestHandler(http.Client{Timeout: time.Second * 5}, srv.URL)
|
||||
_, _, err := c.GetSSZ(t.Context(), "/test")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestPost(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
const endpoint = "/example/rest/api/endpoint"
|
||||
|
||||
Reference in New Issue
Block a user