mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-11 06:18:05 -05:00
Compare commits
3 Commits
attDataBen
...
att-time
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ab41335404 | ||
|
|
ea2624b5ab | ||
|
|
1b40f941cf |
@@ -14,7 +14,7 @@ go_library(
|
||||
"//validator:__subpackages__",
|
||||
],
|
||||
deps = [
|
||||
"//api/server:go_default_library",
|
||||
"//api/server/middleware:go_default_library",
|
||||
"//runtime:go_default_library",
|
||||
"@com_github_gorilla_mux//:go_default_library",
|
||||
"@com_github_grpc_ecosystem_grpc_gateway_v2//runtime:go_default_library",
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"github.com/gorilla/mux"
|
||||
gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/server"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/server/middleware"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
@@ -104,7 +104,7 @@ func (g *Gateway) Start() {
|
||||
}
|
||||
}
|
||||
|
||||
corsMux := server.CorsHandler(g.cfg.allowedOrigins).Middleware(g.cfg.router)
|
||||
corsMux := middleware.CorsHandler(g.cfg.allowedOrigins).Middleware(g.cfg.router)
|
||||
|
||||
if g.cfg.muxHandler != nil {
|
||||
g.cfg.router.PathPrefix("/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
@@ -2,29 +2,14 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"error.go",
|
||||
"middleware.go",
|
||||
"util.go",
|
||||
],
|
||||
srcs = ["error.go"],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v5/api/server",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"@com_github_gorilla_mux//:go_default_library",
|
||||
"@com_github_rs_cors//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"error_test.go",
|
||||
"middleware_test.go",
|
||||
"util_test.go",
|
||||
],
|
||||
srcs = ["error_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
],
|
||||
deps = ["//testing/assert:go_default_library"],
|
||||
)
|
||||
|
||||
@@ -1,32 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/rs/cors"
|
||||
)
|
||||
|
||||
// NormalizeQueryValuesHandler normalizes an input query of "key=value1,value2,value3" to "key=value1&key=value2&key=value3"
|
||||
func NormalizeQueryValuesHandler(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query()
|
||||
NormalizeQueryValues(query)
|
||||
r.URL.RawQuery = query.Encode()
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// CorsHandler sets the cors settings on api endpoints
|
||||
func CorsHandler(allowOrigins []string) mux.MiddlewareFunc {
|
||||
c := cors.New(cors.Options{
|
||||
AllowedOrigins: allowOrigins,
|
||||
AllowedMethods: []string{http.MethodPost, http.MethodGet, http.MethodDelete, http.MethodOptions},
|
||||
AllowCredentials: true,
|
||||
MaxAge: 600,
|
||||
AllowedHeaders: []string{"*"},
|
||||
})
|
||||
|
||||
return c.Handler
|
||||
}
|
||||
29
api/server/middleware/BUILD.bazel
Normal file
29
api/server/middleware/BUILD.bazel
Normal file
@@ -0,0 +1,29 @@
|
||||
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"middleware.go",
|
||||
"util.go",
|
||||
],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v5/api/server/middleware",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"@com_github_gorilla_mux//:go_default_library",
|
||||
"@com_github_rs_cors//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"middleware_test.go",
|
||||
"util_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//api:go_default_library",
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
],
|
||||
)
|
||||
112
api/server/middleware/middleware.go
Normal file
112
api/server/middleware/middleware.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/rs/cors"
|
||||
)
|
||||
|
||||
// NormalizeQueryValuesHandler normalizes an input query of "key=value1,value2,value3" to "key=value1&key=value2&key=value3"
|
||||
func NormalizeQueryValuesHandler(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query()
|
||||
NormalizeQueryValues(query)
|
||||
r.URL.RawQuery = query.Encode()
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// CorsHandler sets the cors settings on api endpoints
|
||||
func CorsHandler(allowOrigins []string) mux.MiddlewareFunc {
|
||||
c := cors.New(cors.Options{
|
||||
AllowedOrigins: allowOrigins,
|
||||
AllowedMethods: []string{http.MethodPost, http.MethodGet, http.MethodDelete, http.MethodOptions},
|
||||
AllowCredentials: true,
|
||||
MaxAge: 600,
|
||||
AllowedHeaders: []string{"*"},
|
||||
})
|
||||
|
||||
return c.Handler
|
||||
}
|
||||
|
||||
// ContentTypeHandler checks request for the appropriate media types otherwise returning a http.StatusUnsupportedMediaType error
|
||||
func ContentTypeHandler(acceptedMediaTypes []string) mux.MiddlewareFunc {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// skip the GET request
|
||||
if r.Method == http.MethodGet {
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
if contentType == "" {
|
||||
http.Error(w, "Content-Type header is missing", http.StatusUnsupportedMediaType)
|
||||
return
|
||||
}
|
||||
|
||||
accepted := false
|
||||
for _, acceptedType := range acceptedMediaTypes {
|
||||
if strings.TrimSpace(contentType) == strings.TrimSpace(acceptedType) {
|
||||
accepted = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !accepted {
|
||||
http.Error(w, fmt.Sprintf("Unsupported media type: %s", contentType), http.StatusUnsupportedMediaType)
|
||||
return
|
||||
}
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// AcceptHeaderHandler checks if the client's response preference is handled
|
||||
func AcceptHeaderHandler(serverAcceptedTypes []string) mux.MiddlewareFunc {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
acceptHeader := r.Header.Get("Accept")
|
||||
// header is optional and should skip if not provided
|
||||
if acceptHeader == "" {
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
accepted := false
|
||||
acceptTypes := strings.Split(acceptHeader, ",")
|
||||
// follows rules defined in https://datatracker.ietf.org/doc/html/rfc2616#section-14.1
|
||||
for _, acceptType := range acceptTypes {
|
||||
acceptType = strings.TrimSpace(acceptType)
|
||||
if acceptType == "*/*" {
|
||||
accepted = true
|
||||
break
|
||||
}
|
||||
for _, serverAcceptedType := range serverAcceptedTypes {
|
||||
if strings.HasPrefix(acceptType, serverAcceptedType) {
|
||||
accepted = true
|
||||
break
|
||||
}
|
||||
if acceptType != "/*" && strings.HasSuffix(acceptType, "/*") && strings.HasPrefix(serverAcceptedType, acceptType[:len(acceptType)-2]) {
|
||||
accepted = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if accepted {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !accepted {
|
||||
http.Error(w, fmt.Sprintf("Not Acceptable: %s", acceptHeader), http.StatusNotAcceptable)
|
||||
return
|
||||
}
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
199
api/server/middleware/middleware_test.go
Normal file
199
api/server/middleware/middleware_test.go
Normal file
@@ -0,0 +1,199 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/api"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||
)
|
||||
|
||||
func TestNormalizeQueryValuesHandler(t *testing.T) {
|
||||
nextHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, err := w.Write([]byte("next handler"))
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
handler := NormalizeQueryValuesHandler(nextHandler)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
inputQuery string
|
||||
expectedQuery string
|
||||
}{
|
||||
{
|
||||
name: "3 values",
|
||||
inputQuery: "key=value1,value2,value3",
|
||||
expectedQuery: "key=value1&key=value2&key=value3", // replace with expected normalized value
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
req, err := http.NewRequest("GET", "/test?"+test.inputQuery, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rr := httptest.NewRecorder()
|
||||
handler.ServeHTTP(rr, req)
|
||||
|
||||
if rr.Code != http.StatusOK {
|
||||
t.Errorf("handler returned wrong status code: got %v want %v", rr.Code, http.StatusOK)
|
||||
}
|
||||
|
||||
if req.URL.RawQuery != test.expectedQuery {
|
||||
t.Errorf("query not normalized: got %v want %v", req.URL.RawQuery, test.expectedQuery)
|
||||
}
|
||||
|
||||
if rr.Body.String() != "next handler" {
|
||||
t.Errorf("next handler was not executed")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestContentTypeHandler(t *testing.T) {
|
||||
acceptedMediaTypes := []string{api.JsonMediaType, api.OctetStreamMediaType}
|
||||
|
||||
nextHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, err := w.Write([]byte("next handler"))
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
handler := ContentTypeHandler(acceptedMediaTypes)(nextHandler)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
contentType string
|
||||
expectedStatusCode int
|
||||
isGet bool
|
||||
}{
|
||||
{
|
||||
name: "Accepted Content-Type - application/json",
|
||||
contentType: api.JsonMediaType,
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "Accepted Content-Type - ssz format",
|
||||
contentType: api.OctetStreamMediaType,
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "Unsupported Content-Type - text/plain",
|
||||
contentType: "text/plain",
|
||||
expectedStatusCode: http.StatusUnsupportedMediaType,
|
||||
},
|
||||
{
|
||||
name: "Missing Content-Type",
|
||||
contentType: "",
|
||||
expectedStatusCode: http.StatusUnsupportedMediaType,
|
||||
},
|
||||
{
|
||||
name: "GET request skips content type check",
|
||||
contentType: "",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
isGet: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
httpMethod := http.MethodPost
|
||||
if tt.isGet {
|
||||
httpMethod = http.MethodGet
|
||||
}
|
||||
req := httptest.NewRequest(httpMethod, "/", nil)
|
||||
if tt.contentType != "" {
|
||||
req.Header.Set("Content-Type", tt.contentType)
|
||||
}
|
||||
rr := httptest.NewRecorder()
|
||||
|
||||
handler.ServeHTTP(rr, req)
|
||||
|
||||
if status := rr.Code; status != tt.expectedStatusCode {
|
||||
t.Errorf("handler returned wrong status code: got %v want %v", status, tt.expectedStatusCode)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAcceptHeaderHandler(t *testing.T) {
|
||||
acceptedTypes := []string{"application/json", "application/octet-stream"}
|
||||
|
||||
nextHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, err := w.Write([]byte("next handler"))
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
handler := AcceptHeaderHandler(acceptedTypes)(nextHandler)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
acceptHeader string
|
||||
expectedStatusCode int
|
||||
}{
|
||||
{
|
||||
name: "Accepted Accept-Type - application/json",
|
||||
acceptHeader: "application/json",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "Accepted Accept-Type - application/octet-stream",
|
||||
acceptHeader: "application/octet-stream",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "Accepted Accept-Type with parameters",
|
||||
acceptHeader: "application/json;q=0.9, application/octet-stream;q=0.8",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "Unsupported Accept-Type - text/plain",
|
||||
acceptHeader: "text/plain",
|
||||
expectedStatusCode: http.StatusNotAcceptable,
|
||||
},
|
||||
{
|
||||
name: "Missing Accept header",
|
||||
acceptHeader: "",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "*/* is accepted",
|
||||
acceptHeader: "*/*",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "application/* is accepted",
|
||||
acceptHeader: "application/*",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "/* is unsupported",
|
||||
acceptHeader: "/*",
|
||||
expectedStatusCode: http.StatusNotAcceptable,
|
||||
},
|
||||
{
|
||||
name: "application/ is unsupported",
|
||||
acceptHeader: "application/",
|
||||
expectedStatusCode: http.StatusNotAcceptable,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
req := httptest.NewRequest("GET", "/", nil)
|
||||
if tt.acceptHeader != "" {
|
||||
req.Header.Set("Accept", tt.acceptHeader)
|
||||
}
|
||||
rr := httptest.NewRecorder()
|
||||
|
||||
handler.ServeHTTP(rr, req)
|
||||
|
||||
if status := rr.Code; status != tt.expectedStatusCode {
|
||||
t.Errorf("handler returned wrong status code: got %v want %v", status, tt.expectedStatusCode)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package server
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
@@ -1,4 +1,4 @@
|
||||
package server
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"testing"
|
||||
@@ -1,54 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||
)
|
||||
|
||||
func TestNormalizeQueryValuesHandler(t *testing.T) {
|
||||
nextHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, err := w.Write([]byte("next handler"))
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
handler := NormalizeQueryValuesHandler(nextHandler)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
inputQuery string
|
||||
expectedQuery string
|
||||
}{
|
||||
{
|
||||
name: "3 values",
|
||||
inputQuery: "key=value1,value2,value3",
|
||||
expectedQuery: "key=value1&key=value2&key=value3", // replace with expected normalized value
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
req, err := http.NewRequest("GET", "/test?"+test.inputQuery, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rr := httptest.NewRecorder()
|
||||
handler.ServeHTTP(rr, req)
|
||||
|
||||
if rr.Code != http.StatusOK {
|
||||
t.Errorf("handler returned wrong status code: got %v want %v", rr.Code, http.StatusOK)
|
||||
}
|
||||
|
||||
if req.URL.RawQuery != test.expectedQuery {
|
||||
t.Errorf("query not normalized: got %v want %v", req.URL.RawQuery, test.expectedQuery)
|
||||
}
|
||||
|
||||
if rr.Body.String() != "next handler" {
|
||||
t.Errorf("next handler was not executed")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,254 +0,0 @@
|
||||
package blocks
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
lruwrpr "github.com/prysmaticlabs/prysm/v5/cache/lru"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/assert"
|
||||
)
|
||||
|
||||
func makeAtts(b *testing.B) []*ethpb.Attestation {
|
||||
attData := make([]*ethpb.AttestationData, 0, 400)
|
||||
for i := 0; i < 400; i++ {
|
||||
bRoot := bytesutil.Bytes32(20)
|
||||
sRoot := bytesutil.Bytes32(1000)
|
||||
tRoot := bytesutil.Bytes32(2000)
|
||||
ad := ðpb.AttestationData{
|
||||
Slot: 3600,
|
||||
CommitteeIndex: primitives.CommitteeIndex(i),
|
||||
BeaconBlockRoot: bRoot,
|
||||
Source: ðpb.Checkpoint{
|
||||
Epoch: primitives.Epoch(1000),
|
||||
Root: sRoot,
|
||||
},
|
||||
Target: ðpb.Checkpoint{
|
||||
Epoch: primitives.Epoch(2000),
|
||||
Root: tRoot,
|
||||
},
|
||||
}
|
||||
attData = append(attData, ad)
|
||||
}
|
||||
allAtts := make([]*ethpb.Attestation, 0)
|
||||
|
||||
for _, ad := range attData {
|
||||
copiedD := ad
|
||||
for i := 0; i < 30; i++ {
|
||||
att := ðpb.Attestation{
|
||||
AggregationBits: bitfield.NewBitlist(3),
|
||||
Data: copiedD,
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
allAtts = append(allAtts, att)
|
||||
}
|
||||
}
|
||||
return allAtts
|
||||
}
|
||||
|
||||
func BenchmarkAttestationDataRoot(b *testing.B) {
|
||||
allAtts := makeAtts(b)
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, a := range allAtts {
|
||||
_, err := a.Data.HashTreeRoot()
|
||||
assert.NoError(b, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var lc = lruwrpr.New(100)
|
||||
|
||||
func getRoot(attData *ethpb.AttestationData) ([32]byte, error) {
|
||||
var key [128]byte
|
||||
|
||||
binary.LittleEndian.PutUint64(key[:8], uint64(attData.Slot))
|
||||
binary.LittleEndian.PutUint64(key[8:16], uint64(attData.CommitteeIndex))
|
||||
copy(key[16:48], attData.BeaconBlockRoot)
|
||||
binary.LittleEndian.PutUint64(key[48:56], uint64(attData.Target.Epoch))
|
||||
copy(key[56:88], attData.Target.Root)
|
||||
binary.LittleEndian.PutUint64(key[88:96], uint64(attData.Source.Epoch))
|
||||
copy(key[96:128], attData.Source.Root)
|
||||
|
||||
rt, ok := lc.Get(key)
|
||||
if ok {
|
||||
return rt.([32]byte), nil
|
||||
}
|
||||
htr, err := attData.HashTreeRoot()
|
||||
if err != nil {
|
||||
return [32]byte{}, err
|
||||
}
|
||||
lc.Add(key, htr)
|
||||
return htr, nil
|
||||
}
|
||||
|
||||
func BenchmarkAttestationDataRootWithCache(b *testing.B) {
|
||||
allAtts := makeAtts(b)
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, a := range allAtts {
|
||||
rt, err := getRoot(a.Data)
|
||||
_ = rt
|
||||
assert.NoError(b, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var rtMap = map[uint64]map[uint64]map[[32]byte]map[uint64]map[[32]byte]map[uint64]map[[32]byte][32]byte{}
|
||||
var sltMap = map[uint64][]*ethpb.AttestationData{}
|
||||
var attMap = map[*ethpb.AttestationData][32]byte{}
|
||||
|
||||
var mt = new(sync.RWMutex)
|
||||
|
||||
func getRootInMap(attData *ethpb.AttestationData) ([32]byte, error) {
|
||||
mt.RLock()
|
||||
mpA, ok := rtMap[uint64(attData.Slot)]
|
||||
mt.RUnlock()
|
||||
if !ok {
|
||||
return recoverRtInMap(attData)
|
||||
}
|
||||
mt.RLock()
|
||||
mpB, ok := mpA[uint64(attData.CommitteeIndex)]
|
||||
mt.RUnlock()
|
||||
if !ok {
|
||||
return recoverRtInMap(attData)
|
||||
|
||||
}
|
||||
mt.RLock()
|
||||
mpC, ok := mpB[[32]byte(attData.BeaconBlockRoot)]
|
||||
mt.RUnlock()
|
||||
if !ok {
|
||||
return recoverRtInMap(attData)
|
||||
|
||||
}
|
||||
mt.RLock()
|
||||
mpD, ok := mpC[uint64(attData.Target.Epoch)]
|
||||
mt.RUnlock()
|
||||
if !ok {
|
||||
return recoverRtInMap(attData)
|
||||
|
||||
}
|
||||
mt.RLock()
|
||||
mpE, ok := mpD[[32]byte(attData.Target.Root)]
|
||||
mt.RUnlock()
|
||||
if !ok {
|
||||
return recoverRtInMap(attData)
|
||||
|
||||
}
|
||||
mt.RLock()
|
||||
mpF, ok := mpE[uint64(attData.Target.Epoch)]
|
||||
mt.RUnlock()
|
||||
if !ok {
|
||||
return recoverRtInMap(attData)
|
||||
}
|
||||
mt.RLock()
|
||||
htr, ok := mpF[[32]byte(attData.Target.Root)]
|
||||
mt.RUnlock()
|
||||
if !ok {
|
||||
return recoverRtInMap(attData)
|
||||
}
|
||||
return htr, nil
|
||||
}
|
||||
|
||||
func recoverRtInMap(attData *ethpb.AttestationData) ([32]byte, error) {
|
||||
htr, err := attData.HashTreeRoot()
|
||||
if err != nil {
|
||||
return [32]byte{}, err
|
||||
}
|
||||
mt.Lock()
|
||||
rtMap[uint64(attData.Slot)] = map[uint64]map[[32]byte]map[uint64]map[[32]byte]map[uint64]map[[32]byte][32]byte{}
|
||||
rtMap[uint64(attData.Slot)][uint64(attData.CommitteeIndex)] = map[[32]byte]map[uint64]map[[32]byte]map[uint64]map[[32]byte][32]byte{}
|
||||
rtMap[uint64(attData.Slot)][uint64(attData.CommitteeIndex)][[32]byte(attData.BeaconBlockRoot)] = map[uint64]map[[32]byte]map[uint64]map[[32]byte][32]byte{}
|
||||
rtMap[uint64(attData.Slot)][uint64(attData.CommitteeIndex)][[32]byte(attData.BeaconBlockRoot)][uint64(attData.Target.Epoch)] = map[[32]byte]map[uint64]map[[32]byte][32]byte{}
|
||||
rtMap[uint64(attData.Slot)][uint64(attData.CommitteeIndex)][[32]byte(attData.BeaconBlockRoot)][uint64(attData.Target.Epoch)][[32]byte(attData.Target.Root)] = map[uint64]map[[32]byte][32]byte{}
|
||||
rtMap[uint64(attData.Slot)][uint64(attData.CommitteeIndex)][[32]byte(attData.BeaconBlockRoot)][uint64(attData.Target.Epoch)][[32]byte(attData.Target.Root)][uint64(attData.Source.Epoch)] = map[[32]byte][32]byte{}
|
||||
rtMap[uint64(attData.Slot)][uint64(attData.CommitteeIndex)][[32]byte(attData.BeaconBlockRoot)][uint64(attData.Target.Epoch)][[32]byte(attData.Target.Root)][uint64(attData.Source.Epoch)][[32]byte(attData.Source.Root)] = htr
|
||||
mt.Unlock()
|
||||
return htr, nil
|
||||
}
|
||||
|
||||
func BenchmarkAttestationDataRootWithMap(b *testing.B) {
|
||||
allAtts := makeAtts(b)
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, a := range allAtts {
|
||||
rt, err := getRootInMap(a.Data)
|
||||
_ = rt
|
||||
assert.NoError(b, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func recoverRtInMapNew(attData *ethpb.AttestationData) ([32]byte, error) {
|
||||
htr, err := attData.HashTreeRoot()
|
||||
if err != nil {
|
||||
return [32]byte{}, err
|
||||
}
|
||||
mt.Lock()
|
||||
sltMap[uint64(attData.Slot)] = append(sltMap[uint64(attData.Slot)], attData)
|
||||
attMap[attData] = htr
|
||||
mt.Unlock()
|
||||
return htr, nil
|
||||
}
|
||||
|
||||
func getRootInMapNew(attData *ethpb.AttestationData) ([32]byte, error) {
|
||||
mt.RLock()
|
||||
attList, ok := sltMap[uint64(attData.Slot)]
|
||||
mt.RUnlock()
|
||||
if !ok {
|
||||
return recoverRtInMapNew(attData)
|
||||
}
|
||||
for _, a := range attList {
|
||||
if attData.CommitteeIndex != a.CommitteeIndex {
|
||||
continue
|
||||
}
|
||||
if !bytes.Equal(attData.BeaconBlockRoot, a.BeaconBlockRoot) {
|
||||
continue
|
||||
}
|
||||
if attData.Target.Epoch != a.Target.Epoch {
|
||||
continue
|
||||
}
|
||||
if !bytes.Equal(attData.Target.Root, a.Target.Root) {
|
||||
continue
|
||||
}
|
||||
if attData.Source.Epoch != a.Source.Epoch {
|
||||
continue
|
||||
}
|
||||
if !bytes.Equal(attData.Source.Root, a.Source.Root) {
|
||||
continue
|
||||
}
|
||||
mt.RLock()
|
||||
val, ok := attMap[a]
|
||||
mt.RUnlock()
|
||||
if !ok {
|
||||
return recoverRtInMapNew(attData)
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
return recoverRtInMapNew(attData)
|
||||
}
|
||||
|
||||
func BenchmarkAttestationDataRootWithMapNew(b *testing.B) {
|
||||
allAtts := makeAtts(b)
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, a := range allAtts {
|
||||
rt, err := getRootInMapNew(a.Data)
|
||||
_ = rt
|
||||
assert.NoError(b, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -92,14 +92,16 @@ func windowMin(latest, offset primitives.Slot) primitives.Slot {
|
||||
|
||||
func (p *blobPruner) warmCache() error {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
defer func() {
|
||||
if !p.warmed {
|
||||
p.warmed = true
|
||||
close(p.cacheReady)
|
||||
}
|
||||
p.Unlock()
|
||||
}()
|
||||
if err := p.prune(0); err != nil {
|
||||
return err
|
||||
}
|
||||
if !p.warmed {
|
||||
p.warmed = true
|
||||
close(p.cacheReady)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -2,16 +2,19 @@ package filesystem
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/util"
|
||||
"github.com/spf13/afero"
|
||||
@@ -34,6 +37,34 @@ func TestTryPruneDir_CachedNotExpired(t *testing.T) {
|
||||
require.Equal(t, 0, pruned)
|
||||
}
|
||||
|
||||
func TestCacheWarmFail(t *testing.T) {
|
||||
fs := afero.NewMemMapFs()
|
||||
n := blobNamer{root: bytesutil.ToBytes32([]byte("derp")), index: 0}
|
||||
bp := n.path()
|
||||
mkdir := path.Dir(bp)
|
||||
require.NoError(t, fs.MkdirAll(mkdir, directoryPermissions))
|
||||
|
||||
// Create an empty blob index in the fs by touching the file at a seemingly valid path.
|
||||
fi, err := fs.Create(bp)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, fi.Close())
|
||||
|
||||
// Cache warm should fail due to the unexpected EOF.
|
||||
pr, err := newBlobPruner(fs, 0)
|
||||
require.NoError(t, err)
|
||||
require.ErrorIs(t, pr.warmCache(), errPruningFailures)
|
||||
|
||||
// The cache warm has finished, so calling waitForCache with a super short deadline
|
||||
// should not block or hit the context deadline.
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(1*time.Millisecond))
|
||||
defer cancel()
|
||||
c, err := pr.waitForCache(ctx)
|
||||
// We will get an error and a nil value for the cache if we hit the deadline.
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, c)
|
||||
}
|
||||
|
||||
func TestTryPruneDir_CachedExpired(t *testing.T) {
|
||||
t.Run("empty directory", func(t *testing.T) {
|
||||
fs := afero.NewMemMapFs()
|
||||
|
||||
@@ -16,7 +16,7 @@ go_library(
|
||||
],
|
||||
deps = [
|
||||
"//api/gateway:go_default_library",
|
||||
"//api/server:go_default_library",
|
||||
"//api/server/middleware:go_default_library",
|
||||
"//async/event:go_default_library",
|
||||
"//beacon-chain/blockchain:go_default_library",
|
||||
"//beacon-chain/builder:go_default_library",
|
||||
|
||||
@@ -20,7 +20,7 @@ import (
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/pkg/errors"
|
||||
apigateway "github.com/prysmaticlabs/prysm/v5/api/gateway"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/server"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/server/middleware"
|
||||
"github.com/prysmaticlabs/prysm/v5/async/event"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/builder"
|
||||
@@ -409,8 +409,8 @@ func newRouter(cliCtx *cli.Context) *mux.Router {
|
||||
allowedOrigins = strings.Split(flags.GPRCGatewayCorsDomain.Value, ",")
|
||||
}
|
||||
r := mux.NewRouter()
|
||||
r.Use(server.NormalizeQueryValuesHandler)
|
||||
r.Use(server.CorsHandler(allowedOrigins))
|
||||
r.Use(middleware.NormalizeQueryValuesHandler)
|
||||
r.Use(middleware.CorsHandler(allowedOrigins))
|
||||
return r
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@ go_library(
|
||||
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc",
|
||||
visibility = ["//beacon-chain:__subpackages__"],
|
||||
deps = [
|
||||
"//api:go_default_library",
|
||||
"//api/server/middleware:go_default_library",
|
||||
"//beacon-chain/blockchain:go_default_library",
|
||||
"//beacon-chain/builder:go_default_library",
|
||||
"//beacon-chain/cache:go_default_library",
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -16,7 +16,6 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/builder"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
||||
@@ -311,13 +310,7 @@ func NewService(ctx context.Context, cfg *Config) *Service {
|
||||
for _, e := range endpoints {
|
||||
s.cfg.Router.HandleFunc(
|
||||
e.template,
|
||||
promhttp.InstrumentHandlerDuration(
|
||||
httpRequestLatency.MustCurryWith(prometheus.Labels{"endpoint": e.name}),
|
||||
promhttp.InstrumentHandlerCounter(
|
||||
httpRequestCount.MustCurryWith(prometheus.Labels{"endpoint": e.name}),
|
||||
e.handler,
|
||||
),
|
||||
),
|
||||
e.handlerWithMiddleware(),
|
||||
).Methods(e.methods...)
|
||||
}
|
||||
|
||||
|
||||
@@ -150,6 +150,12 @@ var (
|
||||
Help: "Time to verify gossiped blob sidecars",
|
||||
},
|
||||
)
|
||||
beaconAttestationReachHalfSummary = promauto.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "attestation_reach_half_milliseconds",
|
||||
Help: "Time for attestations to reach half",
|
||||
},
|
||||
)
|
||||
pendingAttCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "gossip_pending_attestations_total",
|
||||
Help: "increased when receiving a new pending attestation",
|
||||
|
||||
@@ -158,6 +158,7 @@ type Service struct {
|
||||
newBlobVerifier verification.NewBlobVerifier
|
||||
availableBlocker coverage.AvailableBlocker
|
||||
ctxMap ContextByteVersions
|
||||
attReceived chan struct{}
|
||||
}
|
||||
|
||||
// NewService initializes new regular sync service.
|
||||
@@ -173,6 +174,7 @@ func NewService(ctx context.Context, opts ...Option) *Service {
|
||||
seenPendingBlocks: make(map[[32]byte]bool),
|
||||
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
|
||||
signatureChan: make(chan *signatureVerifier, verifierLimit),
|
||||
attReceived: make(chan struct{}),
|
||||
}
|
||||
for _, opt := range opts {
|
||||
if err := opt(r); err != nil {
|
||||
@@ -237,6 +239,8 @@ func (s *Service) Start() {
|
||||
s.maintainPeerStatuses()
|
||||
s.resyncIfBehind()
|
||||
|
||||
go s.beaconAttestationWatcher()
|
||||
|
||||
// Update sync metrics.
|
||||
async.RunEvery(s.ctx, syncMetricsInterval, s.updateMetrics)
|
||||
}
|
||||
|
||||
@@ -33,6 +33,8 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m
|
||||
return nil
|
||||
}
|
||||
|
||||
s.attReceived <- struct{}{}
|
||||
|
||||
return s.cfg.attPool.SaveUnaggregatedAttestation(a)
|
||||
}
|
||||
|
||||
@@ -59,3 +61,29 @@ func (_ *Service) attesterSubnetIndices(currentSlot primitives.Slot) []uint64 {
|
||||
}
|
||||
return slice.SetUint64(commIds)
|
||||
}
|
||||
|
||||
func (s *Service) beaconAttestationWatcher() {
|
||||
slotTicker := slots.NewSlotTicker(s.cfg.chain.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
|
||||
count := 0
|
||||
for {
|
||||
select {
|
||||
case currSlot := <-slotTicker.C():
|
||||
log.Infof("Beacon Attestation Watcher: Clearing Slot: %d, Total received: %d", currSlot, count)
|
||||
count = 0
|
||||
case <-s.attReceived:
|
||||
count++
|
||||
// 1014657 / 32 / 2 =15854
|
||||
if count == 15854 {
|
||||
t := uint64(s.cfg.chain.GenesisTime().Unix())
|
||||
d := slots.TimeIntoSlot(t)
|
||||
currentSlot := slots.CurrentSlot(t)
|
||||
duration := d.Milliseconds() - 4000
|
||||
beaconAttestationReachHalfSummary.Observe(float64(duration))
|
||||
log.Infof("Beacon Attestation Watcher: Receive enough attestation for slot: %d and it took time %d", currentSlot, duration)
|
||||
}
|
||||
case <-s.ctx.Done():
|
||||
slotTicker.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ go_library(
|
||||
deps = [
|
||||
"//api:go_default_library",
|
||||
"//api/gateway:go_default_library",
|
||||
"//api/server:go_default_library",
|
||||
"//api/server/middleware:go_default_library",
|
||||
"//async/event:go_default_library",
|
||||
"//cmd:go_default_library",
|
||||
"//cmd/validator/flags:go_default_library",
|
||||
|
||||
@@ -25,7 +25,7 @@ import (
|
||||
fastssz "github.com/prysmaticlabs/fastssz"
|
||||
"github.com/prysmaticlabs/prysm/v5/api"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/gateway"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/server"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/server/middleware"
|
||||
"github.com/prysmaticlabs/prysm/v5/async/event"
|
||||
"github.com/prysmaticlabs/prysm/v5/cmd"
|
||||
"github.com/prysmaticlabs/prysm/v5/cmd/validator/flags"
|
||||
@@ -155,8 +155,8 @@ func newRouter(cliCtx *cli.Context) *mux.Router {
|
||||
allowedOrigins = strings.Split(flags.GRPCGatewayCorsDomain.Value, ",")
|
||||
}
|
||||
r := mux.NewRouter()
|
||||
r.Use(server.NormalizeQueryValuesHandler)
|
||||
r.Use(server.CorsHandler(allowedOrigins))
|
||||
r.Use(middleware.NormalizeQueryValuesHandler)
|
||||
r.Use(middleware.CorsHandler(allowedOrigins))
|
||||
return r
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user