Compare commits

...

2 Commits

Author SHA1 Message Date
terence tsao
f08d17a71c Use forkchoice store 2024-01-12 14:27:03 -08:00
terence
9e7eac90d3 Revert "Use a cache of one entry to build attestation" 2024-01-12 14:10:57 -08:00
12 changed files with 548 additions and 248 deletions

View File

@@ -93,7 +93,6 @@ go_test(
"//testing/util:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_google_gofuzz//:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)

View File

@@ -1,41 +1,171 @@
package cache
import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"k8s.io/client-go/tools/cache"
)
type AttestationConsensusData struct {
Slot primitives.Slot
HeadRoot []byte
Target forkchoicetypes.Checkpoint
Source forkchoicetypes.Checkpoint
}
var (
// Delay parameters
minDelay = float64(10) // 10 nanoseconds
maxDelay = float64(100000000) // 0.1 second
delayFactor = 1.1
// AttestationCache stores cached results of AttestationData requests.
// Metrics
attestationCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
Name: "attestation_cache_miss",
Help: "The number of attestation data requests that aren't present in the cache.",
})
attestationCacheHit = promauto.NewCounter(prometheus.CounterOpts{
Name: "attestation_cache_hit",
Help: "The number of attestation data requests that are present in the cache.",
})
attestationCacheSize = promauto.NewGauge(prometheus.GaugeOpts{
Name: "attestation_cache_size",
Help: "The number of attestation data in the attestations cache",
})
)
// ErrAlreadyInProgress appears when attempting to mark a cache as in progress while it is
// already in progress. The client should handle this error and wait for the in progress
// data to resolve via Get.
var ErrAlreadyInProgress = errors.New("already in progress")
// AttestationCache is used to store the cached results of an AttestationData request.
type AttestationCache struct {
a *AttestationConsensusData
sync.RWMutex
cache *cache.FIFO
lock sync.RWMutex
inProgress map[string]bool
}
// NewAttestationCache creates a new instance of AttestationCache.
// NewAttestationCache initializes the map and underlying cache.
func NewAttestationCache() *AttestationCache {
return &AttestationCache{}
}
// Get retrieves cached attestation data, recording a cache hit or miss. This method is lock free.
func (c *AttestationCache) Get() *AttestationConsensusData {
return c.a
}
// Put adds a response to the cache. This method is lock free.
func (c *AttestationCache) Put(a *AttestationConsensusData) error {
if a == nil {
return errors.New("attestation cannot be nil")
return &AttestationCache{
cache: cache.NewFIFO(wrapperToKey),
inProgress: make(map[string]bool),
}
c.a = a
}
// Get waits for any in progress calculation to complete before returning a
// cached response, if any.
func (c *AttestationCache) Get(ctx context.Context, req *ethpb.AttestationDataRequest) (*ethpb.AttestationData, error) {
if req == nil {
return nil, errors.New("nil attestation data request")
}
s, e := reqToKey(req)
if e != nil {
return nil, e
}
delay := minDelay
// Another identical request may be in progress already. Let's wait until
// any in progress request resolves or our timeout is exceeded.
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
c.lock.RLock()
if !c.inProgress[s] {
c.lock.RUnlock()
break
}
c.lock.RUnlock()
// This increasing backoff is to decrease the CPU cycles while waiting
// for the in progress boolean to flip to false.
time.Sleep(time.Duration(delay) * time.Nanosecond)
delay *= delayFactor
delay = math.Min(delay, maxDelay)
}
item, exists, err := c.cache.GetByKey(s)
if err != nil {
return nil, err
}
if exists && item != nil && item.(*attestationReqResWrapper).res != nil {
attestationCacheHit.Inc()
return ethpb.CopyAttestationData(item.(*attestationReqResWrapper).res), nil
}
attestationCacheMiss.Inc()
return nil, nil
}
// MarkInProgress a request so that any other similar requests will block on
// Get until MarkNotInProgress is called.
func (c *AttestationCache) MarkInProgress(req *ethpb.AttestationDataRequest) error {
c.lock.Lock()
defer c.lock.Unlock()
s, e := reqToKey(req)
if e != nil {
return e
}
if c.inProgress[s] {
return ErrAlreadyInProgress
}
c.inProgress[s] = true
return nil
}
// MarkNotInProgress will release the lock on a given request. This should be
// called after put.
func (c *AttestationCache) MarkNotInProgress(req *ethpb.AttestationDataRequest) error {
c.lock.Lock()
defer c.lock.Unlock()
s, e := reqToKey(req)
if e != nil {
return e
}
delete(c.inProgress, s)
return nil
}
// Put the response in the cache.
func (c *AttestationCache) Put(_ context.Context, req *ethpb.AttestationDataRequest, res *ethpb.AttestationData) error {
data := &attestationReqResWrapper{
req,
res,
}
if err := c.cache.AddIfNotPresent(data); err != nil {
return err
}
trim(c.cache, maxCacheSize)
attestationCacheSize.Set(float64(len(c.cache.List())))
return nil
}
func wrapperToKey(i interface{}) (string, error) {
w, ok := i.(*attestationReqResWrapper)
if !ok {
return "", errors.New("key is not of type *attestationReqResWrapper")
}
if w == nil {
return "", errors.New("nil wrapper")
}
if w.req == nil {
return "", errors.New("nil wrapper.request")
}
return reqToKey(w.req)
}
func reqToKey(req *ethpb.AttestationDataRequest) (string, error) {
return fmt.Sprintf("%d", req.Slot), nil
}
type attestationReqResWrapper struct {
req *ethpb.AttestationDataRequest
res *ethpb.AttestationData
}

View File

@@ -1,53 +1,41 @@
package cache_test
import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache"
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
"github.com/stretchr/testify/require"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"google.golang.org/protobuf/proto"
)
func TestAttestationCache_RoundTrip(t *testing.T) {
ctx := context.Background()
c := cache.NewAttestationCache()
a := c.Get()
require.Nil(t, a)
insert := &cache.AttestationConsensusData{
Slot: 1,
HeadRoot: []byte{1},
Target: forkchoicetypes.Checkpoint{
Epoch: 2,
Root: [32]byte{3},
},
Source: forkchoicetypes.Checkpoint{
Epoch: 4,
Root: [32]byte{5},
},
}
err := c.Put(insert)
require.NoError(t, err)
a = c.Get()
require.Equal(t, insert, a)
insert = &cache.AttestationConsensusData{
Slot: 6,
HeadRoot: []byte{7},
Target: forkchoicetypes.Checkpoint{
Epoch: 8,
Root: [32]byte{9},
},
Source: forkchoicetypes.Checkpoint{
Epoch: 10,
Root: [32]byte{11},
},
req := &ethpb.AttestationDataRequest{
CommitteeIndex: 0,
Slot: 1,
}
err = c.Put(insert)
require.NoError(t, err)
response, err := c.Get(ctx, req)
assert.NoError(t, err)
assert.Equal(t, (*ethpb.AttestationData)(nil), response)
a = c.Get()
require.Equal(t, insert, a)
assert.NoError(t, c.MarkInProgress(req))
res := &ethpb.AttestationData{
Target: &ethpb.Checkpoint{Epoch: 5, Root: make([]byte, 32)},
}
assert.NoError(t, c.Put(ctx, req, res))
assert.NoError(t, c.MarkNotInProgress(req))
response, err = c.Get(ctx, req)
assert.NoError(t, err)
if !proto.Equal(response, res) {
t.Error("Expected equal protos to return from cache")
}
}

View File

@@ -1,9 +1,16 @@
package cache
import (
"github.com/prysmaticlabs/prysm/v4/config/params"
"k8s.io/client-go/tools/cache"
)
var (
// maxCacheSize is 4x of the epoch length for additional cache padding.
// Requests should be only accessing committees within defined epoch length.
maxCacheSize = uint64(4 * params.BeaconConfig().SlotsPerEpoch)
)
// trim the FIFO queue to the maxSize.
func trim(queue *cache.FIFO, maxSize uint64) {
for s := uint64(len(queue.ListKeys())); s > maxSize; s-- {

View File

@@ -14,9 +14,4 @@ var (
errNotSyncCommitteeIndexPosition = errors.New("not syncCommitteeIndexPosition struct")
// ErrNotFoundRegistration when validator registration does not exist in cache.
ErrNotFoundRegistration = errors.Wrap(ErrNotFound, "no validator registered")
// ErrAlreadyInProgress appears when attempting to mark a cache as in progress while it is
// already in progress. The client should handle this error and wait for the in progress
// data to resolve via Get.
ErrAlreadyInProgress = errors.New("already in progress")
)

View File

@@ -15,11 +15,6 @@ import (
)
var (
// Delay parameters
minDelay = float64(10) // 10 nanoseconds
maxDelay = float64(100000000) // 0.1 second
delayFactor = 1.1
// Metrics
skipSlotCacheHit = promauto.NewCounter(prometheus.CounterOpts{
Name: "skip_slot_cache_hit",

View File

@@ -20,7 +20,6 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/forkchoice/types:go_default_library",
"//beacon-chain/operations/synccommittee:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/state:go_default_library",

View File

@@ -16,7 +16,6 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
beaconState "github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
@@ -315,9 +314,6 @@ func (s *Service) AggregatedSigAndAggregationBits(
func (s *Service) GetAttestationData(
ctx context.Context, req *ethpb.AttestationDataRequest,
) (*ethpb.AttestationData, *RpcError) {
if req.Slot != s.GenesisTimeFetcher.CurrentSlot() {
return nil, &RpcError{Reason: BadRequest, Err: errors.Errorf("invalid request: slot %d is not the current slot %d", req.Slot, s.GenesisTimeFetcher.CurrentSlot())}
}
if err := helpers.ValidateAttestationTime(
req.Slot,
s.GenesisTimeFetcher.GenesisTime(),
@@ -326,49 +322,35 @@ func (s *Service) GetAttestationData(
return nil, &RpcError{Reason: BadRequest, Err: errors.Errorf("invalid request: %v", err)}
}
s.AttestationCache.RLock()
res := s.AttestationCache.Get()
if res != nil && res.Slot == req.Slot {
s.AttestationCache.RUnlock()
return &ethpb.AttestationData{
Slot: res.Slot,
CommitteeIndex: req.CommitteeIndex,
BeaconBlockRoot: res.HeadRoot,
Source: &ethpb.Checkpoint{
Epoch: res.Source.Epoch,
Root: res.Source.Root[:],
},
Target: &ethpb.Checkpoint{
Epoch: res.Target.Epoch,
Root: res.Target.Root[:],
},
}, nil
res, err := s.AttestationCache.Get(ctx, req)
if err != nil {
return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not retrieve data from attestation cache: %v", err)}
}
s.AttestationCache.RUnlock()
s.AttestationCache.Lock()
defer s.AttestationCache.Unlock()
// We check the cache again as in the event there are multiple inflight requests for
// the same attestation data, the cache might have been filled while we were waiting
// to acquire the lock.
res = s.AttestationCache.Get()
if res != nil && res.Slot == req.Slot {
return &ethpb.AttestationData{
Slot: res.Slot,
CommitteeIndex: req.CommitteeIndex,
BeaconBlockRoot: res.HeadRoot,
Source: &ethpb.Checkpoint{
Epoch: res.Source.Epoch,
Root: res.Source.Root[:],
},
Target: &ethpb.Checkpoint{
Epoch: res.Target.Epoch,
Root: res.Target.Root[:],
},
}, nil
if res != nil {
res.CommitteeIndex = req.CommitteeIndex
return res, nil
}
if err := s.AttestationCache.MarkInProgress(req); err != nil {
if errors.Is(err, cache.ErrAlreadyInProgress) {
res, err := s.AttestationCache.Get(ctx, req)
if err != nil {
return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not retrieve data from attestation cache: %v", err)}
}
if res == nil {
return nil, &RpcError{Reason: Internal, Err: errors.New("a request was in progress and resolved to nil")}
}
res.CommitteeIndex = req.CommitteeIndex
return res, nil
}
return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not mark attestation as in-progress: %v", err)}
}
defer func() {
if err := s.AttestationCache.MarkNotInProgress(req); err != nil {
log.WithError(err).Error("could not mark attestation as not-in-progress")
}
}()
headRoot, err := s.HeadFetcher.HeadRoot(ctx)
if err != nil {
return nil, &RpcError{Reason: Internal, Err: errors.Wrap(err, "could not get head root")}
@@ -379,34 +361,25 @@ func (s *Service) GetAttestationData(
return nil, &RpcError{Reason: Internal, Err: errors.Wrap(err, "could not get target root")}
}
justifiedCheckpoint := s.FinalizedFetcher.CurrentJustifiedCheckpt()
if err = s.AttestationCache.Put(&cache.AttestationConsensusData{
Slot: req.Slot,
HeadRoot: headRoot,
Target: forkchoicetypes.Checkpoint{
Epoch: targetEpoch,
Root: targetRoot,
},
Source: forkchoicetypes.Checkpoint{
Epoch: justifiedCheckpoint.Epoch,
Root: bytesutil.ToBytes32(justifiedCheckpoint.Root),
},
}); err != nil {
log.WithError(err).Error("Failed to put attestation data into cache")
if justifiedCheckpoint == nil {
return nil, &RpcError{Reason: Internal, Err: errors.New("could not get justified checkpoint")}
}
return &ethpb.AttestationData{
res = &ethpb.AttestationData{
Slot: req.Slot,
CommitteeIndex: req.CommitteeIndex,
BeaconBlockRoot: headRoot,
Source: &ethpb.Checkpoint{
Epoch: justifiedCheckpoint.Epoch,
Root: justifiedCheckpoint.Root,
},
Source: justifiedCheckpoint,
Target: &ethpb.Checkpoint{
Epoch: targetEpoch,
Root: targetRoot[:],
},
}, nil
}
if err := s.AttestationCache.Put(ctx, req, res); err != nil {
log.WithError(err).Error("could not store attestation data in cache")
}
return res, nil
}
// SubmitSyncMessage submits the sync committee message to the network.

View File

@@ -76,6 +76,7 @@ go_test(
"//beacon-chain/rpc/eth/shared/testing:go_default_library",
"//beacon-chain/rpc/testutil:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/sync/initial-sync/testing:go_default_library",
"//config/fieldparams:go_default_library",

View File

@@ -9,6 +9,7 @@ import (
"net/http/httptest"
"strconv"
"strings"
"sync"
"testing"
"time"
@@ -29,6 +30,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/testutil"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
@@ -811,18 +813,29 @@ func TestGetAttestationData(t *testing.T) {
require.NoError(t, err, "Could not hash beacon block")
justifiedRoot, err := justifiedBlock.Block.HashTreeRoot()
require.NoError(t, err, "Could not get signing root for justified block")
targetRoot, err := targetBlock.Block.HashTreeRoot()
require.NoError(t, err, "Could not get signing root for target block")
slot := 3*params.BeaconConfig().SlotsPerEpoch + 1
justifiedCheckpoint := &ethpbalpha.Checkpoint{
beaconState, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, beaconState.SetSlot(slot))
err = beaconState.SetCurrentJustifiedCheckpoint(&ethpbalpha.Checkpoint{
Epoch: 2,
Root: justifiedRoot[:],
}
})
require.NoError(t, err)
blockRoots := beaconState.BlockRoots()
blockRoots[1] = blockRoot[:]
blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:]
blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:]
require.NoError(t, beaconState.SetBlockRoots(blockRoots))
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
chain := &mockChain.ChainService{
Optimistic: false,
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
Root: blockRoot[:],
CurrentJustifiedCheckPoint: justifiedCheckpoint,
TargetRoot: blockRoot,
Optimistic: false,
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
State: beaconState,
Root: blockRoot[:],
}
s := &Server{
@@ -831,10 +844,9 @@ func TestGetAttestationData(t *testing.T) {
TimeFetcher: chain,
OptimisticModeFetcher: chain,
CoreService: &core.Service{
AttestationCache: cache.NewAttestationCache(),
HeadFetcher: chain,
GenesisTimeFetcher: chain,
FinalizedFetcher: chain,
AttestationCache: cache.NewAttestationCache(),
},
}
@@ -902,10 +914,9 @@ func TestGetAttestationData(t *testing.T) {
beaconState, err := util.NewBeaconState()
require.NoError(t, err)
chain := &mockChain.ChainService{
Optimistic: true,
State: beaconState,
Genesis: time.Now(),
CurrentJustifiedCheckPoint: &ethpbalpha.Checkpoint{},
Optimistic: true,
State: beaconState,
Genesis: time.Now(),
}
s := &Server{
@@ -917,7 +928,6 @@ func TestGetAttestationData(t *testing.T) {
AttestationCache: cache.NewAttestationCache(),
GenesisTimeFetcher: chain,
HeadFetcher: chain,
FinalizedFetcher: chain,
},
}
@@ -944,13 +954,97 @@ func TestGetAttestationData(t *testing.T) {
assert.Equal(t, http.StatusOK, writer.Code)
})
t.Run("handles in progress request", func(t *testing.T) {
state, err := state_native.InitializeFromProtoPhase0(&ethpbalpha.BeaconState{Slot: 100})
require.NoError(t, err)
ctx := context.Background()
slot := primitives.Slot(2)
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
chain := &mockChain.ChainService{
Optimistic: false,
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
State: state,
}
s := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: false},
HeadFetcher: chain,
TimeFetcher: chain,
OptimisticModeFetcher: chain,
CoreService: &core.Service{
AttestationCache: cache.NewAttestationCache(),
HeadFetcher: chain,
GenesisTimeFetcher: chain,
},
}
expectedResponse := &GetAttestationDataResponse{
Data: &shared.AttestationData{
Slot: strconv.FormatUint(uint64(slot), 10),
CommitteeIndex: strconv.FormatUint(1, 10),
BeaconBlockRoot: hexutil.Encode(make([]byte, 32)),
Source: &shared.Checkpoint{
Epoch: strconv.FormatUint(42, 10),
Root: hexutil.Encode(make([]byte, 32)),
},
Target: &shared.Checkpoint{
Epoch: strconv.FormatUint(55, 10),
Root: hexutil.Encode(make([]byte, 32)),
},
},
}
expectedResponsePb := &ethpbalpha.AttestationData{
Slot: slot,
CommitteeIndex: 1,
BeaconBlockRoot: make([]byte, 32),
Source: &ethpbalpha.Checkpoint{Epoch: 42, Root: make([]byte, 32)},
Target: &ethpbalpha.Checkpoint{Epoch: 55, Root: make([]byte, 32)},
}
url := fmt.Sprintf("http://example.com?slot=%d&committee_index=%d", slot, 1)
request := httptest.NewRequest(http.MethodGet, url, nil)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
requestPb := &ethpbalpha.AttestationDataRequest{
CommitteeIndex: 1,
Slot: slot,
}
require.NoError(t, s.CoreService.AttestationCache.MarkInProgress(requestPb))
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
s.GetAttestationData(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
resp := &GetAttestationDataResponse{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp))
require.NotNil(t, resp)
assert.DeepEqual(t, expectedResponse, resp)
}()
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, s.CoreService.AttestationCache.Put(ctx, requestPb, expectedResponsePb))
assert.NoError(t, s.CoreService.AttestationCache.MarkNotInProgress(requestPb))
}()
wg.Wait()
})
t.Run("invalid slot", func(t *testing.T) {
slot := 3*params.BeaconConfig().SlotsPerEpoch + 1
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
chain := &mockChain.ChainService{
Optimistic: false,
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
CurrentJustifiedCheckPoint: &ethpbalpha.Checkpoint{},
Optimistic: false,
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
}
s := &Server{
@@ -960,7 +1054,6 @@ func TestGetAttestationData(t *testing.T) {
OptimisticModeFetcher: chain,
CoreService: &core.Service{
GenesisTimeFetcher: chain,
FinalizedFetcher: chain,
},
}
@@ -978,7 +1071,7 @@ func TestGetAttestationData(t *testing.T) {
assert.Equal(t, true, strings.Contains(e.Message, "invalid request"))
})
t.Run("request slot is not current slot", func(t *testing.T) {
t.Run("head state slot greater than request slot", func(t *testing.T) {
ctx := context.Background()
db := dbutil.SetupDB(t)
@@ -998,17 +1091,37 @@ func TestGetAttestationData(t *testing.T) {
util.SaveBlock(t, ctx, db, block2)
justifiedRoot, err := justifiedBlock.Block.HashTreeRoot()
require.NoError(t, err, "Could not get signing root for justified block")
justifiedCheckpoint := &ethpbalpha.Checkpoint{
targetRoot, err := targetBlock.Block.HashTreeRoot()
require.NoError(t, err, "Could not get signing root for target block")
beaconState, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, beaconState.SetSlot(slot))
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
require.NoError(t, beaconState.SetGenesisTime(uint64(time.Now().Unix()-offset)))
err = beaconState.SetLatestBlockHeader(util.HydrateBeaconHeader(&ethpbalpha.BeaconBlockHeader{
ParentRoot: blockRoot2[:],
}))
require.NoError(t, err)
err = beaconState.SetCurrentJustifiedCheckpoint(&ethpbalpha.Checkpoint{
Epoch: 2,
Root: justifiedRoot[:],
}
})
require.NoError(t, err)
blockRoots := beaconState.BlockRoots()
blockRoots[1] = blockRoot[:]
blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:]
blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:]
blockRoots[3*params.BeaconConfig().SlotsPerEpoch] = blockRoot2[:]
require.NoError(t, beaconState.SetBlockRoots(blockRoots))
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
beaconstate := beaconState.Copy()
require.NoError(t, beaconstate.SetSlot(beaconstate.Slot()-1))
require.NoError(t, db.SaveState(ctx, beaconstate, blockRoot2))
chain := &mockChain.ChainService{
Root: blockRoot[:],
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
CurrentJustifiedCheckPoint: justifiedCheckpoint,
TargetRoot: blockRoot2,
State: beaconState,
Root: blockRoot[:],
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
}
s := &Server{
@@ -1017,20 +1130,45 @@ func TestGetAttestationData(t *testing.T) {
TimeFetcher: chain,
OptimisticModeFetcher: chain,
CoreService: &core.Service{
AttestationCache: cache.NewAttestationCache(),
HeadFetcher: chain,
GenesisTimeFetcher: chain,
StateGen: stategen.New(db, doublylinkedtree.New()),
FinalizedFetcher: chain,
},
}
require.NoError(t, db.SaveState(ctx, beaconState, blockRoot))
util.SaveBlock(t, ctx, db, block)
require.NoError(t, db.SaveHeadBlockRoot(ctx, blockRoot))
url := fmt.Sprintf("http://example.com?slot=%d&committee_index=%d", slot-1, 0)
request := httptest.NewRequest(http.MethodGet, url, nil)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.GetAttestationData(writer, request)
assert.Equal(t, http.StatusBadRequest, writer.Code)
expectedResponse := &GetAttestationDataResponse{
Data: &shared.AttestationData{
Slot: strconv.FormatUint(uint64(slot-1), 10),
CommitteeIndex: strconv.FormatUint(0, 10),
BeaconBlockRoot: hexutil.Encode(blockRoot2[:]),
Source: &shared.Checkpoint{
Epoch: strconv.FormatUint(2, 10),
Root: hexutil.Encode(justifiedRoot[:]),
},
Target: &shared.Checkpoint{
Epoch: strconv.FormatUint(3, 10),
Root: hexutil.Encode(blockRoot2[:]),
},
},
}
assert.Equal(t, http.StatusOK, writer.Code)
resp := &GetAttestationDataResponse{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp))
require.NotNil(t, resp)
assert.DeepEqual(t, expectedResponse, resp)
})
t.Run("succeeds in first epoch", func(t *testing.T) {
@@ -1045,18 +1183,27 @@ func TestGetAttestationData(t *testing.T) {
require.NoError(t, err, "Could not hash beacon block")
justifiedRoot, err := justifiedBlock.Block.HashTreeRoot()
require.NoError(t, err, "Could not get signing root for justified block")
targetRoot, err := targetBlock.Block.HashTreeRoot()
require.NoError(t, err, "Could not get signing root for target block")
justifiedCheckpt := &ethpbalpha.Checkpoint{
beaconState, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, beaconState.SetSlot(slot))
err = beaconState.SetCurrentJustifiedCheckpoint(&ethpbalpha.Checkpoint{
Epoch: 0,
Root: justifiedRoot[:],
}
})
require.NoError(t, err)
blockRoots := beaconState.BlockRoots()
blockRoots[1] = blockRoot[:]
blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:]
blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:]
require.NoError(t, beaconState.SetBlockRoots(blockRoots))
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
chain := &mockChain.ChainService{
Root: blockRoot[:],
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
CurrentJustifiedCheckPoint: justifiedCheckpt,
TargetRoot: blockRoot,
State: beaconState,
Root: blockRoot[:],
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
}
s := &Server{
@@ -1068,7 +1215,6 @@ func TestGetAttestationData(t *testing.T) {
AttestationCache: cache.NewAttestationCache(),
HeadFetcher: chain,
GenesisTimeFetcher: chain,
FinalizedFetcher: chain,
},
}
@@ -1132,17 +1278,28 @@ func TestGetAttestationData(t *testing.T) {
require.NoError(t, err, "Could not hash beacon block")
justifiedBlockRoot, err := justifiedBlock.Block.HashTreeRoot()
require.NoError(t, err, "Could not hash justified block")
justifiedCheckpt := &ethpbalpha.Checkpoint{
epochBoundaryRoot, err := epochBoundaryBlock.Block.HashTreeRoot()
require.NoError(t, err, "Could not hash justified block")
slot := primitives.Slot(10000)
beaconState, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, beaconState.SetSlot(slot))
err = beaconState.SetCurrentJustifiedCheckpoint(&ethpbalpha.Checkpoint{
Epoch: slots.ToEpoch(1500),
Root: justifiedBlockRoot[:],
}
slot := primitives.Slot(10000)
})
require.NoError(t, err)
blockRoots := beaconState.BlockRoots()
blockRoots[1] = blockRoot[:]
blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = epochBoundaryRoot[:]
blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedBlockRoot[:]
require.NoError(t, beaconState.SetBlockRoots(blockRoots))
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
chain := &mockChain.ChainService{
Root: blockRoot[:],
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
CurrentJustifiedCheckPoint: justifiedCheckpt,
TargetRoot: blockRoot,
State: beaconState,
Root: blockRoot[:],
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
}
s := &Server{
@@ -1154,7 +1311,6 @@ func TestGetAttestationData(t *testing.T) {
AttestationCache: cache.NewAttestationCache(),
HeadFetcher: chain,
GenesisTimeFetcher: chain,
FinalizedFetcher: chain,
},
}

View File

@@ -48,13 +48,23 @@ func TestAttestationDataAtSlot_HandlesFarAwayJustifiedEpoch(t *testing.T) {
require.NoError(t, err, "Could not hash beacon block")
justifiedBlockRoot, err := justifiedBlock.Block.HashTreeRoot()
require.NoError(t, err, "Could not hash justified block")
epochBoundaryRoot, err := epochBoundaryBlock.Block.HashTreeRoot()
require.NoError(t, err, "Could not hash justified block")
slot := primitives.Slot(10000)
justifiedCheckpoint := &ethpb.Checkpoint{
beaconState, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, beaconState.SetSlot(slot))
err = beaconState.SetCurrentJustifiedCheckpoint(&ethpb.Checkpoint{
Epoch: slots.ToEpoch(1500),
Root: justifiedBlockRoot[:],
}
})
require.NoError(t, err)
slot := primitives.Slot(10000)
blockRoots := beaconState.BlockRoots()
blockRoots[1] = blockRoot[:]
blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = epochBoundaryRoot[:]
blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedBlockRoot[:]
require.NoError(t, beaconState.SetBlockRoots(blockRoots))
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
attesterServer := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: false},
@@ -62,9 +72,8 @@ func TestAttestationDataAtSlot_HandlesFarAwayJustifiedEpoch(t *testing.T) {
TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
CoreService: &core.Service{
AttestationCache: cache.NewAttestationCache(),
HeadFetcher: &mock.ChainService{TargetRoot: blockRoot, Root: blockRoot[:]},
HeadFetcher: &mock.ChainService{State: beaconState, Root: blockRoot[:]},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint},
},
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
mockp2p "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing"
"github.com/prysmaticlabs/prysm/v4/config/params"
@@ -146,63 +147,6 @@ func TestGetAttestationData_OK(t *testing.T) {
}
}
func BenchmarkGetAttestationDataConcurrent(b *testing.B) {
block := util.NewBeaconBlock()
block.Block.Slot = 3*params.BeaconConfig().SlotsPerEpoch + 1
targetBlock := util.NewBeaconBlock()
targetBlock.Block.Slot = 1 * params.BeaconConfig().SlotsPerEpoch
targetRoot, err := targetBlock.Block.HashTreeRoot()
require.NoError(b, err, "Could not get signing root for target block")
justifiedBlock := util.NewBeaconBlock()
justifiedBlock.Block.Slot = 2 * params.BeaconConfig().SlotsPerEpoch
blockRoot, err := block.Block.HashTreeRoot()
require.NoError(b, err, "Could not hash beacon block")
justifiedRoot, err := justifiedBlock.Block.HashTreeRoot()
require.NoError(b, err, "Could not get signing root for justified block")
slot := 3*params.BeaconConfig().SlotsPerEpoch + 1
justifiedCheckpoint := &ethpb.Checkpoint{
Epoch: 2,
Root: justifiedRoot[:],
}
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
attesterServer := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: false},
OptimisticModeFetcher: &mock.ChainService{Optimistic: false},
TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
CoreService: &core.Service{
AttestationCache: cache.NewAttestationCache(),
HeadFetcher: &mock.ChainService{TargetRoot: targetRoot, Root: blockRoot[:]},
GenesisTimeFetcher: &mock.ChainService{
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
},
FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint},
},
}
req := &ethpb.AttestationDataRequest{
CommitteeIndex: 0,
Slot: 3*params.BeaconConfig().SlotsPerEpoch + 1,
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
wg.Add(5000) // for 5000 concurrent accesses
for j := 0; j < 5000; j++ {
go func() {
defer wg.Done()
_, err := attesterServer.GetAttestationData(context.Background(), req)
require.NoError(b, err, "Could not get attestation info at slot")
}()
}
wg.Wait() // Wait for all goroutines to finish
}
b.Log("Elapsed time:", b.Elapsed())
}
func TestGetAttestationData_SyncNotReady(t *testing.T) {
as := Server{
SyncChecker: &mockSync.Sync{IsSyncing: true},
@@ -250,6 +194,59 @@ func TestGetAttestationData_Optimistic(t *testing.T) {
require.NoError(t, err)
}
func TestAttestationDataSlot_handlesInProgressRequest(t *testing.T) {
s := &ethpb.BeaconState{Slot: 100}
state, err := state_native.InitializeFromProtoPhase0(s)
require.NoError(t, err)
ctx := context.Background()
slot := primitives.Slot(2)
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
server := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: false},
OptimisticModeFetcher: &mock.ChainService{Optimistic: false},
TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
CoreService: &core.Service{
AttestationCache: cache.NewAttestationCache(),
HeadFetcher: &mock.ChainService{State: state},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
},
}
req := &ethpb.AttestationDataRequest{
CommitteeIndex: 1,
Slot: slot,
}
res := &ethpb.AttestationData{
CommitteeIndex: 1,
Target: &ethpb.Checkpoint{Epoch: 55, Root: make([]byte, 32)},
}
require.NoError(t, server.CoreService.AttestationCache.MarkInProgress(req))
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
response, err := server.GetAttestationData(ctx, req)
require.NoError(t, err)
if !proto.Equal(res, response) {
t.Error("Expected equal responses from cache")
}
}()
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, server.CoreService.AttestationCache.Put(ctx, req, res))
assert.NoError(t, server.CoreService.AttestationCache.MarkNotInProgress(req))
}()
wg.Wait()
}
func TestServer_GetAttestationData_InvalidRequestSlot(t *testing.T) {
ctx := context.Background()
@@ -271,7 +268,11 @@ func TestServer_GetAttestationData_InvalidRequestSlot(t *testing.T) {
assert.ErrorContains(t, "invalid request", err)
}
func TestServer_GetAttestationData_RequestSlotIsDifferentThanCurrentSlot(t *testing.T) {
func TestServer_GetAttestationData_HeadStateSlotGreaterThanRequestSlot(t *testing.T) {
// There exists a rare scenario where the validator may request an attestation for a slot less
// than the head state's slot. The Ethereum consensus spec constraints require the block root the
// attestation is referencing be less than or equal to the attestation data slot.
// See: https://github.com/prysmaticlabs/prysm/issues/5164
ctx := context.Background()
db := dbutil.SetupDB(t)
@@ -291,30 +292,77 @@ func TestServer_GetAttestationData_RequestSlotIsDifferentThanCurrentSlot(t *test
util.SaveBlock(t, ctx, db, block2)
justifiedRoot, err := justifiedBlock.Block.HashTreeRoot()
require.NoError(t, err, "Could not get signing root for justified block")
targetRoot, err := targetBlock.Block.HashTreeRoot()
require.NoError(t, err, "Could not get signing root for target block")
beaconState, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, beaconState.SetSlot(slot))
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
require.NoError(t, beaconState.SetGenesisTime(uint64(time.Now().Unix()-offset)))
err = beaconState.SetLatestBlockHeader(util.HydrateBeaconHeader(&ethpb.BeaconBlockHeader{
ParentRoot: blockRoot2[:],
}))
require.NoError(t, err)
err = beaconState.SetCurrentJustifiedCheckpoint(&ethpb.Checkpoint{
Epoch: 2,
Root: justifiedRoot[:],
})
require.NoError(t, err)
blockRoots := beaconState.BlockRoots()
blockRoots[1] = blockRoot[:]
blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:]
blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:]
blockRoots[3*params.BeaconConfig().SlotsPerEpoch] = blockRoot2[:]
require.NoError(t, beaconState.SetBlockRoots(blockRoots))
beaconstate := beaconState.Copy()
require.NoError(t, beaconstate.SetSlot(beaconstate.Slot()-1))
require.NoError(t, db.SaveState(ctx, beaconstate, blockRoot2))
offset = int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
justifiedCheckpoint := &ethpb.Checkpoint{
Epoch: 2,
Root: justifiedRoot[:],
}
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
attesterServer := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: false},
OptimisticModeFetcher: &mock.ChainService{Optimistic: false},
TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{TargetRoot: blockRoot2, Root: blockRoot[:]},
AttestationCache: cache.NewAttestationCache(),
HeadFetcher: &mock.ChainService{State: beaconState, Root: blockRoot[:], TargetRoot: blockRoot2},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
StateGen: stategen.New(db, doublylinkedtree.New()),
FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint},
},
}
require.NoError(t, db.SaveState(ctx, beaconState, blockRoot))
util.SaveBlock(t, ctx, db, block)
require.NoError(t, db.SaveHeadBlockRoot(ctx, blockRoot))
req := &ethpb.AttestationDataRequest{
CommitteeIndex: 0,
Slot: slot - 1,
}
_, err = attesterServer.GetAttestationData(ctx, req)
require.ErrorContains(t, "invalid request: slot 24 is not the current slot 25", err)
res, err := attesterServer.GetAttestationData(ctx, req)
require.NoError(t, err, "Could not get attestation info at slot")
expectedInfo := &ethpb.AttestationData{
Slot: slot - 1,
BeaconBlockRoot: blockRoot[:],
Source: &ethpb.Checkpoint{
Epoch: 2,
Root: justifiedRoot[:],
},
Target: &ethpb.Checkpoint{
Epoch: 3,
Root: blockRoot2[:],
},
}
if !proto.Equal(res, expectedInfo) {
t.Errorf("Expected attestation info to match, received %v, wanted %v", res, expectedInfo)
}
}
func TestGetAttestationData_SucceedsInFirstEpoch(t *testing.T) {