Compare commits

..

1 Commits

Author SHA1 Message Date
Kasey Kirkham
b1fc178b7d support env var to customize Accept header 2025-06-26 12:08:23 -05:00
5 changed files with 50 additions and 87 deletions

View File

@@ -6,6 +6,7 @@ import (
"net"
"net/http"
"net/url"
"os"
"github.com/pkg/errors"
)
@@ -14,14 +15,17 @@ const (
MaxBodySize int64 = 1 << 23 // 8MB default, WithMaxBodySize can override
MaxBodySizeState int64 = 1 << 29 // 512MB
MaxErrBodySize int64 = 1 << 17 // 128KB
envNameOverrideAccept = "PRYSM_API_OVERRIDE_ACCEPT"
)
// Client is a wrapper object around the HTTP client.
type Client struct {
hc *http.Client
baseURL *url.URL
token string
maxBodySize int64
hc *http.Client
baseURL *url.URL
token string
maxBodySize int64
reqOverrides []ReqOption
}
// NewClient constructs a new client with the provided options (ex WithTimeout).
@@ -40,9 +44,21 @@ func NewClient(host string, opts ...ClientOpt) (*Client, error) {
for _, o := range opts {
o(c)
}
c.appendAcceptOverride()
return c, nil
}
// appendAcceptOverride enables the Accept header to be customized at runtime via an environment variable.
// This is specified as an env var because it is a niche option that prysm may use for performance testing or debugging
// bug which users are unlikely to need. Using an env var keeps the set of user-facing flags cleaner.
func (c *Client) appendAcceptOverride() {
if accept := os.Getenv(envNameOverrideAccept); accept != "" {
c.reqOverrides = append(c.reqOverrides, func(req *http.Request) {
req.Header.Set("Accept", accept)
})
}
}
// Token returns the bearer token used for jwt authentication
func (c *Client) Token() string {
return c.token
@@ -55,6 +71,9 @@ func (c *Client) BaseURL() *url.URL {
// Do execute the request against the http client
func (c *Client) Do(req *http.Request) (*http.Response, error) {
for _, o := range c.reqOverrides {
o(req)
}
return c.hc.Do(req)
}
@@ -87,7 +106,7 @@ func (c *Client) Get(ctx context.Context, path string, opts ...ReqOption) ([]byt
for _, o := range opts {
o(req)
}
r, err := c.hc.Do(req)
r, err := c.Do(req)
if err != nil {
return nil, err
}

View File

@@ -1,7 +1,10 @@
package client
import (
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"
"github.com/OffchainLabs/prysm/v6/testing/require"
@@ -46,3 +49,23 @@ func TestBaseURL(t *testing.T) {
require.Equal(t, "www.offchainlabs.com", cl.BaseURL().Hostname())
require.Equal(t, "3500", cl.BaseURL().Port())
}
func TestAcceptOverride(t *testing.T) {
name := "TestAcceptOverride"
orig := os.Getenv(envNameOverrideAccept)
defer func() {
os.Setenv(envNameOverrideAccept, orig)
}()
os.Setenv(envNameOverrideAccept, name)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, name, r.Header.Get("Accept"))
w.WriteHeader(200)
w.Write([]byte("ok"))
}))
defer srv.Close()
c, err := NewClient(srv.URL)
require.NoError(t, err)
b, err := c.Get(t.Context(), "/test")
require.NoError(t, err)
require.Equal(t, "ok", string(b))
}

View File

@@ -13,7 +13,7 @@ import (
const signatureVerificationInterval = 50 * time.Millisecond
const verifierLimit = 1000
const verifierLimit = 50
type signatureVerifier struct {
set *bls.SignatureBatch

View File

@@ -3,8 +3,6 @@ package sync
import (
"context"
"fmt"
"sync"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/config/features"
@@ -14,35 +12,15 @@ import (
eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
var (
attestationTracker = make(map[primitives.Slot]*slotAttestationTracker)
attestationTrackerMu sync.RWMutex
)
type slotAttestationTracker struct {
count uint64
startTime time.Time
loggedThresholds map[int]bool
mu sync.RWMutex
}
func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, msg proto.Message) error {
a, ok := msg.(eth.Att)
if !ok {
return fmt.Errorf("message was not type eth.Att, type=%T", msg)
}
currentSlot := s.cfg.clock.CurrentSlot()
attSlot := a.GetData().GetSlot()
if attSlot == currentSlot {
s.trackAttestationArrival(attSlot)
}
if features.Get().EnableExperimentalAttestationPool {
return s.cfg.attestationCache.Add(a)
} else {
@@ -57,65 +35,6 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m
}
}
func (s *Service) trackAttestationArrival(slot primitives.Slot) {
attestationTrackerMu.Lock()
tracker, exists := attestationTracker[slot]
if !exists {
slotStartTime, err := slots.ToTime(uint64(s.cfg.clock.GenesisTime().Unix()), slot)
if err != nil {
attestationTrackerMu.Unlock()
return
}
tracker = &slotAttestationTracker{
startTime: slotStartTime,
loggedThresholds: make(map[int]bool),
}
attestationTracker[slot] = tracker
}
attestationTrackerMu.Unlock()
tracker.mu.Lock()
tracker.count++
currentCount := tracker.count
sinceStart := time.Since(tracker.startTime)
tracker.mu.Unlock()
expectedAttestations := 1083289 / 32
percentage := float64(currentCount) / float64(expectedAttestations) * 100
thresholds := []int{40, 50, 66, 80, 90, 98}
tracker.mu.Lock()
defer tracker.mu.Unlock()
for _, threshold := range thresholds {
if percentage >= float64(threshold) && !tracker.loggedThresholds[threshold] {
log.WithFields(logrus.Fields{
"slotStartTime": tracker.startTime.Unix(),
"slot": slot,
"count": currentCount,
"expected": expectedAttestations,
"percentage": fmt.Sprintf("%.1f%%", percentage),
"sinceSlotStartTime": sinceStart,
"sinceAttCutoffTime": sinceStart - 4*time.Second,
}).Info("Attestation arrival threshold reached")
tracker.loggedThresholds[threshold] = true
}
}
s.cleanupOldTrackers(slot)
}
func (s *Service) cleanupOldTrackers(currentSlot primitives.Slot) {
attestationTrackerMu.Lock()
defer attestationTrackerMu.Unlock()
for slot := range attestationTracker {
if slot < currentSlot-2 {
delete(attestationTracker, slot)
}
}
}
func (*Service) persistentSubnetIndices() []uint64 {
return cache.SubnetIDs.GetAllSubnets()
}

View File

@@ -0,0 +1,2 @@
## Added
- env var to force beacon api client to use a custom accept header (eg force json responses).