Compare commits

...

4 Commits

Author SHA1 Message Date
satushh
c27a48aa24 exactly one goroutine computes at a time 2026-03-03 22:38:41 +05:30
satushh
ddd2883de5 lint 2026-03-03 12:45:32 +05:30
satushh
771b1fda14 metric fix 2026-03-03 12:34:12 +05:30
satushh
b801b12af1 cache for payload committee computation 2026-03-02 22:10:21 +05:30
10 changed files with 451 additions and 3 deletions

View File

@@ -18,6 +18,7 @@ go_library(
"interfaces.go",
"log.go",
"payload_attestation.go",
"payload_committee.go",
"payload_id.go",
"proposer_indices.go",
"proposer_indices_disabled.go", # keep
@@ -78,6 +79,7 @@ go_test(
"committee_fuzz_test.go",
"committee_test.go",
"payload_attestation_test.go",
"payload_committee_test.go",
"payload_id_test.go",
"private_access_test.go",
"proposer_indices_test.go",

127
beacon-chain/cache/payload_committee.go vendored Normal file
View File

@@ -0,0 +1,127 @@
//go:build !fuzz
package cache
import (
"context"
"sync"
"time"
lruwrpr "github.com/OffchainLabs/prysm/v7/cache/lru"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
const (
// maxPayloadCommitteeCacheSize is the max number of payload committee entries to cache.
// 64 covers two full epochs of slots.
maxPayloadCommitteeCacheSize = 64
)
var (
// PayloadCommitteeCacheMiss tracks the number of payload committee requests that aren't present in the cache.
PayloadCommitteeCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
Name: "payload_committee_cache_miss",
Help: "The number of payload committee requests that aren't present in the cache.",
})
// PayloadCommitteeCacheHit tracks the number of payload committee requests that are in the cache.
PayloadCommitteeCacheHit = promauto.NewCounter(prometheus.CounterOpts{
Name: "payload_committee_cache_hit",
Help: "The number of payload committee requests that are present in the cache.",
})
)
// PayloadCommitteeCache is an LRU cache for payload timeliness committee results keyed by ptcSeed.
type PayloadCommitteeCache struct {
cache *lru.Cache
lock sync.RWMutex
inProgress map[string]bool
}
// NewPayloadCommitteeCache creates a new cache for storing payload committee results.
func NewPayloadCommitteeCache() *PayloadCommitteeCache {
c := &PayloadCommitteeCache{}
c.Clear()
return c
}
// Get returns the cached payload committee for the given seed. Returns nil on cache miss.
// Blocks if another goroutine is computing the same seed.
func (c *PayloadCommitteeCache) Get(ctx context.Context, seed [32]byte) ([]primitives.ValidatorIndex, error) {
if err := c.checkInProgress(ctx, seed); err != nil {
return nil, err
}
obj, exists := c.cache.Get(key(seed))
if exists {
PayloadCommitteeCacheHit.Inc()
} else {
PayloadCommitteeCacheMiss.Inc()
return nil, nil
}
indices, ok := obj.([]primitives.ValidatorIndex)
if !ok {
return nil, ErrIncorrectType
}
return indices, nil
}
// Add stores a payload committee result in the cache.
func (c *PayloadCommitteeCache) Add(seed [32]byte, indices []primitives.ValidatorIndex) {
c.cache.Add(key(seed), indices)
}
// MarkInProgress marks a seed as being computed. Returns ErrAlreadyInProgress if another
// goroutine is already computing it.
func (c *PayloadCommitteeCache) MarkInProgress(seed [32]byte) error {
c.lock.Lock()
defer c.lock.Unlock()
s := key(seed)
if c.inProgress[s] {
return ErrAlreadyInProgress
}
c.inProgress[s] = true
return nil
}
// MarkNotInProgress releases the in-progress lock on a given seed.
func (c *PayloadCommitteeCache) MarkNotInProgress(seed [32]byte) error {
c.lock.Lock()
defer c.lock.Unlock()
s := key(seed)
delete(c.inProgress, s)
return nil
}
// Clear resets the cache to its initial state.
func (c *PayloadCommitteeCache) Clear() {
c.lock.Lock()
defer c.lock.Unlock()
c.cache = lruwrpr.New(maxPayloadCommitteeCacheSize)
c.inProgress = make(map[string]bool)
}
func (c *PayloadCommitteeCache) checkInProgress(ctx context.Context, seed [32]byte) error {
delay := minDelay
for {
if ctx.Err() != nil {
return ctx.Err()
}
c.lock.RLock()
if !c.inProgress[key(seed)] {
c.lock.RUnlock()
break
}
c.lock.RUnlock()
time.Sleep(time.Duration(delay) * time.Nanosecond)
delay *= delayFactor
delay = min(delay, maxDelay)
}
return nil
}

View File

@@ -0,0 +1,39 @@
//go:build fuzz
// This file is used in fuzzer builds to bypass the payload committee cache.
package cache
import (
"context"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
)
// FakePayloadCommitteeCache is a no-op implementation of the payload committee cache for fuzz builds.
type FakePayloadCommitteeCache struct{}
// NewPayloadCommitteeCache creates a new fake cache.
func NewPayloadCommitteeCache() *FakePayloadCommitteeCache {
return &FakePayloadCommitteeCache{}
}
// Get is a stub.
func (c *FakePayloadCommitteeCache) Get(_ context.Context, _ [32]byte) ([]primitives.ValidatorIndex, error) {
return nil, nil
}
// Add is a stub.
func (c *FakePayloadCommitteeCache) Add(_ [32]byte, _ []primitives.ValidatorIndex) {}
// MarkInProgress is a stub.
func (c *FakePayloadCommitteeCache) MarkInProgress(_ [32]byte) error {
return nil
}
// MarkNotInProgress is a stub.
func (c *FakePayloadCommitteeCache) MarkNotInProgress(_ [32]byte) error {
return nil
}
// Clear is a stub.
func (c *FakePayloadCommitteeCache) Clear() {}

View File

@@ -0,0 +1,94 @@
//go:build !fuzz
package cache
import (
"context"
"strconv"
"testing"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func TestPayloadCommitteeCache_MissOnEmpty(t *testing.T) {
c := NewPayloadCommitteeCache()
seed := [32]byte{'A'}
indices, err := c.Get(t.Context(), seed)
require.NoError(t, err)
assert.Equal(t, true, indices == nil, "Expected nil on empty cache")
}
func TestPayloadCommitteeCache_AddThenHit(t *testing.T) {
c := NewPayloadCommitteeCache()
seed := [32]byte{'A'}
want := []primitives.ValidatorIndex{1, 2, 3, 4, 5}
c.Add(seed, want)
got, err := c.Get(t.Context(), seed)
require.NoError(t, err)
assert.DeepEqual(t, want, got)
}
func TestPayloadCommitteeCache_LRUEviction(t *testing.T) {
c := NewPayloadCommitteeCache()
// Fill beyond capacity.
for i := range maxPayloadCommitteeCacheSize + 10 {
s := bytesutil.ToBytes32([]byte(strconv.Itoa(i)))
c.Add(s, []primitives.ValidatorIndex{primitives.ValidatorIndex(i)})
}
// Oldest entries should be evicted.
s := bytesutil.ToBytes32([]byte(strconv.Itoa(0)))
got, err := c.Get(t.Context(), s)
require.NoError(t, err)
assert.Equal(t, true, got == nil, "Expected oldest entry to be evicted")
// Newest entry should still be present.
s = bytesutil.ToBytes32([]byte(strconv.Itoa(maxPayloadCommitteeCacheSize + 9)))
got, err = c.Get(t.Context(), s)
require.NoError(t, err)
assert.Equal(t, 1, len(got))
}
func TestPayloadCommitteeCache_CancelledContext(t *testing.T) {
c := NewPayloadCommitteeCache()
seed := [32]byte{'A'}
// Mark in progress so Get blocks.
require.NoError(t, c.MarkInProgress(seed))
ctx, cancel := context.WithCancel(t.Context())
cancel()
_, err := c.Get(ctx, seed)
require.ErrorIs(t, err, context.Canceled)
require.NoError(t, c.MarkNotInProgress(seed))
}
func TestPayloadCommitteeCache_MarkInProgressDuplicate(t *testing.T) {
c := NewPayloadCommitteeCache()
seed := [32]byte{'A'}
require.NoError(t, c.MarkInProgress(seed))
err := c.MarkInProgress(seed)
assert.Equal(t, ErrAlreadyInProgress, err)
require.NoError(t, c.MarkNotInProgress(seed))
}
func TestPayloadCommitteeCache_Clear(t *testing.T) {
c := NewPayloadCommitteeCache()
seed := [32]byte{'A'}
c.Add(seed, []primitives.ValidatorIndex{1, 2, 3})
c.Clear()
got, err := c.Get(t.Context(), seed)
require.NoError(t, err)
assert.Equal(t, true, got == nil, "Expected nil after Clear")
}

View File

@@ -16,6 +16,7 @@ go_library(
importpath = "github.com/OffchainLabs/prysm/v7/beacon-chain/core/gloas",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/requests:go_default_library",
"//beacon-chain/core/signing:go_default_library",
@@ -68,11 +69,13 @@ go_test(
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//crypto/bls/common:go_default_library",
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/validator-client:go_default_library",
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//time/slots:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"slices"
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/signing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
@@ -122,6 +123,37 @@ func PayloadCommittee(ctx context.Context, st state.ReadOnlyBeaconState, slot pr
return nil, err
}
// Try cache, then acquire the in-progress lock. If another goroutine
// is already computing, wait for it and retry. This loop ensures that
// exactly one goroutine computes at a time, avoiding thundering herd
// when a prior computation fails without populating the cache.
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
cached, err := helpers.PayloadCommitteeFromCache(ctx, seed)
if err != nil {
return nil, err
}
if cached != nil {
return cached, nil
}
if err := helpers.MarkPayloadCommitteeInProgress(seed); err != nil {
if !errors.Is(err, cache.ErrAlreadyInProgress) {
return nil, err
}
// Another goroutine is computing. PayloadCommitteeFromCache
// will block (via checkInProgress backoff) until it finishes,
// then we loop back to check the cache and retry.
continue
}
// We own the in-progress lock.
break
}
defer func() {
_ = helpers.MarkPayloadCommitteeNotInProgress(seed)
}()
activeCount, err := helpers.ActiveValidatorCount(ctx, st, epoch)
if err != nil {
return nil, err
@@ -153,6 +185,8 @@ func PayloadCommittee(ctx context.Context, st state.ReadOnlyBeaconState, slot pr
}
}
helpers.AddPayloadCommittee(seed, selected)
return selected, nil
}

View File

@@ -2,7 +2,9 @@ package gloas_test
import (
"bytes"
"sync"
"testing"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/gloas"
@@ -15,7 +17,10 @@ import (
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/crypto/bls"
"github.com/OffchainLabs/prysm/v7/crypto/bls/common"
"github.com/OffchainLabs/prysm/v7/crypto/hash"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
eth "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
testutil "github.com/OffchainLabs/prysm/v7/testing/util"
"github.com/OffchainLabs/prysm/v7/time/slots"
@@ -303,3 +308,115 @@ func (s *validatorLookupErrState) ValidatorAtIndexReadOnly(idx primitives.Valida
}
return s.BeaconState.ValidatorAtIndexReadOnly(idx)
}
// ptcSeed mirrors the seed derivation in PayloadCommittee so the test can
// pre-mark the seed as in-progress.
func ptcSeed(t *testing.T, st state.ReadOnlyBeaconState, slot primitives.Slot) [32]byte {
epoch := slots.ToEpoch(slot)
seed, err := helpers.Seed(st, epoch, params.BeaconConfig().DomainPTCAttester)
require.NoError(t, err)
return hash.Hash(append(seed[:], bytesutil.Bytes8(uint64(slot))...))
}
// TestPayloadCommittee_ConcurrentInProgress verifies that when another
// goroutine holds the in-progress lock and then releases WITHOUT populating
// the cache (simulating a failed computation), PayloadCommittee falls through
// and computes the result itself instead of returning an error.
func TestPayloadCommittee_ConcurrentInProgress(t *testing.T) {
helpers.ClearCache()
setupTestConfig(t)
_, pk1 := newKey(t)
_, pk2 := newKey(t)
vals := []*eth.Validator{activeValidator(pk1), activeValidator(pk2)}
st := newTestState(t, vals, 2)
slot := primitives.Slot(1)
seed := ptcSeed(t, st, slot)
// Simulate another goroutine holding the lock.
require.NoError(t, helpers.MarkPayloadCommitteeInProgress(seed))
// Release the lock after a short delay WITHOUT adding to cache,
// simulating the other goroutine failing.
go func() {
time.Sleep(20 * time.Millisecond)
_ = helpers.MarkPayloadCommitteeNotInProgress(seed)
}()
// PayloadCommittee should wait, see no cache entry, and compute itself.
ptc, err := gloas.PayloadCommittee(t.Context(), st, slot)
require.NoError(t, err)
assert.Equal(t, true, len(ptc) > 0, "expected non-empty PTC")
}
// TestPayloadCommittee_ConcurrentCacheHit verifies that when another goroutine
// holds the in-progress lock and then populates the cache, a concurrent caller
// gets the cached result.
func TestPayloadCommittee_ConcurrentCacheHit(t *testing.T) {
helpers.ClearCache()
setupTestConfig(t)
_, pk1 := newKey(t)
_, pk2 := newKey(t)
vals := []*eth.Validator{activeValidator(pk1), activeValidator(pk2)}
st := newTestState(t, vals, 2)
slot := primitives.Slot(1)
seed := ptcSeed(t, st, slot)
// First, compute the expected result.
expected, err := gloas.PayloadCommittee(t.Context(), st, slot)
require.NoError(t, err)
helpers.ClearCache()
// Simulate another goroutine that will populate the cache.
require.NoError(t, helpers.MarkPayloadCommitteeInProgress(seed))
go func() {
time.Sleep(20 * time.Millisecond)
helpers.AddPayloadCommittee(seed, expected)
_ = helpers.MarkPayloadCommitteeNotInProgress(seed)
}()
ptc, err := gloas.PayloadCommittee(t.Context(), st, slot)
require.NoError(t, err)
assert.DeepEqual(t, expected, ptc)
}
// TestPayloadCommittee_ParallelCallers verifies that multiple concurrent
// callers all get the correct result without errors.
func TestPayloadCommittee_ParallelCallers(t *testing.T) {
helpers.ClearCache()
setupTestConfig(t)
_, pk1 := newKey(t)
_, pk2 := newKey(t)
vals := []*eth.Validator{activeValidator(pk1), activeValidator(pk2)}
st := newTestState(t, vals, 2)
slot := primitives.Slot(1)
// Compute expected result first.
expected, err := gloas.PayloadCommittee(t.Context(), st, slot)
require.NoError(t, err)
helpers.ClearCache()
const numCallers = 8
var wg sync.WaitGroup
errs := make([]error, numCallers)
results := make([][]primitives.ValidatorIndex, numCallers)
for i := range numCallers {
wg.Add(1)
go func(idx int) {
defer wg.Done()
results[idx], errs[idx] = gloas.PayloadCommittee(t.Context(), st, slot)
}(i)
}
wg.Wait()
for i := range numCallers {
require.NoError(t, errs[i], "caller %d returned error", i)
assert.DeepEqual(t, expected, results[i], "caller %d got wrong result", i)
}
}

View File

@@ -27,8 +27,9 @@ import (
)
var (
committeeCache = cache.NewCommitteesCache()
proposerIndicesCache = cache.NewProposerIndicesCache()
committeeCache = cache.NewCommitteesCache()
proposerIndicesCache = cache.NewProposerIndicesCache()
payloadCommitteeCache = cache.NewPayloadCommitteeCache()
)
type beaconCommitteeFunc = func(
@@ -613,6 +614,28 @@ func ClearCache() {
proposerIndicesCache.Prune(0)
syncCommitteeCache.Clear()
balanceCache.Clear()
payloadCommitteeCache.Clear()
}
// PayloadCommitteeFromCache returns the cached payload committee for the given seed.
// Returns nil on cache miss.
func PayloadCommitteeFromCache(ctx context.Context, seed [32]byte) ([]primitives.ValidatorIndex, error) {
return payloadCommitteeCache.Get(ctx, seed)
}
// AddPayloadCommittee stores a payload committee result in the cache.
func AddPayloadCommittee(seed [32]byte, indices []primitives.ValidatorIndex) {
payloadCommitteeCache.Add(seed, indices)
}
// MarkPayloadCommitteeInProgress marks a payload committee seed as being computed.
func MarkPayloadCommitteeInProgress(seed [32]byte) error {
return payloadCommitteeCache.MarkInProgress(seed)
}
// MarkPayloadCommitteeNotInProgress releases the in-progress lock on a payload committee seed.
func MarkPayloadCommitteeNotInProgress(seed [32]byte) error {
return payloadCommitteeCache.MarkNotInProgress(seed)
}
// ComputeCommittee returns the requested shuffled committee out of the total committees using

View File

@@ -0,0 +1,3 @@
### Added
- Add caching for PayloadCommittee computation to avoid redundant calculations

View File

@@ -84,6 +84,12 @@ var metricComparisonTests = []comparisonTest{
topic2: "hot_state_cache_hit",
expectedComparison: 0.01,
},
{
name: "payload committee cache",
topic1: "payload_committee_cache_miss",
topic2: "payload_committee_cache_hit",
expectedComparison: 0.01,
},
}
func metricsTest(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
@@ -196,7 +202,7 @@ func metricCheckComparison(pageContent, topic1, topic2 string, comparison float6
}
func valueOfTopic(pageContent, topic string) (int, error) {
regexExp, err := regexp.Compile(topic + " ")
regexExp, err := regexp.Compile(`(?:^|\s)` + topic + " ")
if err != nil {
return -1, errors.Wrap(err, "could not create regex expression")
}