mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-11 06:18:05 -05:00
Compare commits
1 Commits
log-att
...
customize-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b1fc178b7d |
@@ -6,6 +6,7 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
@@ -14,14 +15,17 @@ const (
|
||||
MaxBodySize int64 = 1 << 23 // 8MB default, WithMaxBodySize can override
|
||||
MaxBodySizeState int64 = 1 << 29 // 512MB
|
||||
MaxErrBodySize int64 = 1 << 17 // 128KB
|
||||
|
||||
envNameOverrideAccept = "PRYSM_API_OVERRIDE_ACCEPT"
|
||||
)
|
||||
|
||||
// Client is a wrapper object around the HTTP client.
|
||||
type Client struct {
|
||||
hc *http.Client
|
||||
baseURL *url.URL
|
||||
token string
|
||||
maxBodySize int64
|
||||
hc *http.Client
|
||||
baseURL *url.URL
|
||||
token string
|
||||
maxBodySize int64
|
||||
reqOverrides []ReqOption
|
||||
}
|
||||
|
||||
// NewClient constructs a new client with the provided options (ex WithTimeout).
|
||||
@@ -40,9 +44,21 @@ func NewClient(host string, opts ...ClientOpt) (*Client, error) {
|
||||
for _, o := range opts {
|
||||
o(c)
|
||||
}
|
||||
c.appendAcceptOverride()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// appendAcceptOverride enables the Accept header to be customized at runtime via an environment variable.
|
||||
// This is specified as an env var because it is a niche option that prysm may use for performance testing or debugging
|
||||
// bug which users are unlikely to need. Using an env var keeps the set of user-facing flags cleaner.
|
||||
func (c *Client) appendAcceptOverride() {
|
||||
if accept := os.Getenv(envNameOverrideAccept); accept != "" {
|
||||
c.reqOverrides = append(c.reqOverrides, func(req *http.Request) {
|
||||
req.Header.Set("Accept", accept)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Token returns the bearer token used for jwt authentication
|
||||
func (c *Client) Token() string {
|
||||
return c.token
|
||||
@@ -55,6 +71,9 @@ func (c *Client) BaseURL() *url.URL {
|
||||
|
||||
// Do execute the request against the http client
|
||||
func (c *Client) Do(req *http.Request) (*http.Response, error) {
|
||||
for _, o := range c.reqOverrides {
|
||||
o(req)
|
||||
}
|
||||
return c.hc.Do(req)
|
||||
}
|
||||
|
||||
@@ -87,7 +106,7 @@ func (c *Client) Get(ctx context.Context, path string, opts ...ReqOption) ([]byt
|
||||
for _, o := range opts {
|
||||
o(req)
|
||||
}
|
||||
r, err := c.hc.Do(req)
|
||||
r, err := c.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
@@ -46,3 +49,23 @@ func TestBaseURL(t *testing.T) {
|
||||
require.Equal(t, "www.offchainlabs.com", cl.BaseURL().Hostname())
|
||||
require.Equal(t, "3500", cl.BaseURL().Port())
|
||||
}
|
||||
|
||||
func TestAcceptOverride(t *testing.T) {
|
||||
name := "TestAcceptOverride"
|
||||
orig := os.Getenv(envNameOverrideAccept)
|
||||
defer func() {
|
||||
os.Setenv(envNameOverrideAccept, orig)
|
||||
}()
|
||||
os.Setenv(envNameOverrideAccept, name)
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
require.Equal(t, name, r.Header.Get("Accept"))
|
||||
w.WriteHeader(200)
|
||||
w.Write([]byte("ok"))
|
||||
}))
|
||||
defer srv.Close()
|
||||
c, err := NewClient(srv.URL)
|
||||
require.NoError(t, err)
|
||||
b, err := c.Get(t.Context(), "/test")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "ok", string(b))
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
|
||||
const signatureVerificationInterval = 50 * time.Millisecond
|
||||
|
||||
const verifierLimit = 1000
|
||||
const verifierLimit = 50
|
||||
|
||||
type signatureVerifier struct {
|
||||
set *bls.SignatureBatch
|
||||
|
||||
@@ -3,8 +3,6 @@ package sync
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
|
||||
"github.com/OffchainLabs/prysm/v6/config/features"
|
||||
@@ -14,35 +12,15 @@ import (
|
||||
eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
var (
|
||||
attestationTracker = make(map[primitives.Slot]*slotAttestationTracker)
|
||||
attestationTrackerMu sync.RWMutex
|
||||
)
|
||||
|
||||
type slotAttestationTracker struct {
|
||||
count uint64
|
||||
startTime time.Time
|
||||
loggedThresholds map[int]bool
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, msg proto.Message) error {
|
||||
a, ok := msg.(eth.Att)
|
||||
if !ok {
|
||||
return fmt.Errorf("message was not type eth.Att, type=%T", msg)
|
||||
}
|
||||
|
||||
currentSlot := s.cfg.clock.CurrentSlot()
|
||||
attSlot := a.GetData().GetSlot()
|
||||
|
||||
if attSlot == currentSlot {
|
||||
s.trackAttestationArrival(attSlot)
|
||||
}
|
||||
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
return s.cfg.attestationCache.Add(a)
|
||||
} else {
|
||||
@@ -57,65 +35,6 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) trackAttestationArrival(slot primitives.Slot) {
|
||||
attestationTrackerMu.Lock()
|
||||
tracker, exists := attestationTracker[slot]
|
||||
if !exists {
|
||||
slotStartTime, err := slots.ToTime(uint64(s.cfg.clock.GenesisTime().Unix()), slot)
|
||||
if err != nil {
|
||||
attestationTrackerMu.Unlock()
|
||||
return
|
||||
}
|
||||
tracker = &slotAttestationTracker{
|
||||
startTime: slotStartTime,
|
||||
loggedThresholds: make(map[int]bool),
|
||||
}
|
||||
attestationTracker[slot] = tracker
|
||||
}
|
||||
attestationTrackerMu.Unlock()
|
||||
|
||||
tracker.mu.Lock()
|
||||
tracker.count++
|
||||
currentCount := tracker.count
|
||||
sinceStart := time.Since(tracker.startTime)
|
||||
tracker.mu.Unlock()
|
||||
|
||||
expectedAttestations := 1083289 / 32
|
||||
percentage := float64(currentCount) / float64(expectedAttestations) * 100
|
||||
thresholds := []int{40, 50, 66, 80, 90, 98}
|
||||
|
||||
tracker.mu.Lock()
|
||||
defer tracker.mu.Unlock()
|
||||
|
||||
for _, threshold := range thresholds {
|
||||
if percentage >= float64(threshold) && !tracker.loggedThresholds[threshold] {
|
||||
log.WithFields(logrus.Fields{
|
||||
"slotStartTime": tracker.startTime.Unix(),
|
||||
"slot": slot,
|
||||
"count": currentCount,
|
||||
"expected": expectedAttestations,
|
||||
"percentage": fmt.Sprintf("%.1f%%", percentage),
|
||||
"sinceSlotStartTime": sinceStart,
|
||||
"sinceAttCutoffTime": sinceStart - 4*time.Second,
|
||||
}).Info("Attestation arrival threshold reached")
|
||||
tracker.loggedThresholds[threshold] = true
|
||||
}
|
||||
}
|
||||
|
||||
s.cleanupOldTrackers(slot)
|
||||
}
|
||||
|
||||
func (s *Service) cleanupOldTrackers(currentSlot primitives.Slot) {
|
||||
attestationTrackerMu.Lock()
|
||||
defer attestationTrackerMu.Unlock()
|
||||
|
||||
for slot := range attestationTracker {
|
||||
if slot < currentSlot-2 {
|
||||
delete(attestationTracker, slot)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (*Service) persistentSubnetIndices() []uint64 {
|
||||
return cache.SubnetIDs.GetAllSubnets()
|
||||
}
|
||||
|
||||
2
changelog/kasey_customize-http-accept-header.md
Normal file
2
changelog/kasey_customize-http-accept-header.md
Normal file
@@ -0,0 +1,2 @@
|
||||
## Added
|
||||
- env var to force beacon api client to use a custom accept header (eg force json responses).
|
||||
Reference in New Issue
Block a user