mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
Compare commits
2 Commits
ba2333069a
...
revert-133
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f08d17a71c | ||
|
|
9e7eac90d3 |
1
beacon-chain/cache/BUILD.bazel
vendored
1
beacon-chain/cache/BUILD.bazel
vendored
@@ -93,7 +93,6 @@ go_test(
|
||||
"//testing/util:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||
"@com_github_google_gofuzz//:go_default_library",
|
||||
"@com_github_stretchr_testify//require:go_default_library",
|
||||
"@org_golang_google_protobuf//proto:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
180
beacon-chain/cache/attestation_data.go
vendored
180
beacon-chain/cache/attestation_data.go
vendored
@@ -1,41 +1,171 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type AttestationConsensusData struct {
|
||||
Slot primitives.Slot
|
||||
HeadRoot []byte
|
||||
Target forkchoicetypes.Checkpoint
|
||||
Source forkchoicetypes.Checkpoint
|
||||
}
|
||||
var (
|
||||
// Delay parameters
|
||||
minDelay = float64(10) // 10 nanoseconds
|
||||
maxDelay = float64(100000000) // 0.1 second
|
||||
delayFactor = 1.1
|
||||
|
||||
// AttestationCache stores cached results of AttestationData requests.
|
||||
// Metrics
|
||||
attestationCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "attestation_cache_miss",
|
||||
Help: "The number of attestation data requests that aren't present in the cache.",
|
||||
})
|
||||
attestationCacheHit = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "attestation_cache_hit",
|
||||
Help: "The number of attestation data requests that are present in the cache.",
|
||||
})
|
||||
attestationCacheSize = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "attestation_cache_size",
|
||||
Help: "The number of attestation data in the attestations cache",
|
||||
})
|
||||
)
|
||||
|
||||
// ErrAlreadyInProgress appears when attempting to mark a cache as in progress while it is
|
||||
// already in progress. The client should handle this error and wait for the in progress
|
||||
// data to resolve via Get.
|
||||
var ErrAlreadyInProgress = errors.New("already in progress")
|
||||
|
||||
// AttestationCache is used to store the cached results of an AttestationData request.
|
||||
type AttestationCache struct {
|
||||
a *AttestationConsensusData
|
||||
sync.RWMutex
|
||||
cache *cache.FIFO
|
||||
lock sync.RWMutex
|
||||
inProgress map[string]bool
|
||||
}
|
||||
|
||||
// NewAttestationCache creates a new instance of AttestationCache.
|
||||
// NewAttestationCache initializes the map and underlying cache.
|
||||
func NewAttestationCache() *AttestationCache {
|
||||
return &AttestationCache{}
|
||||
}
|
||||
|
||||
// Get retrieves cached attestation data, recording a cache hit or miss. This method is lock free.
|
||||
func (c *AttestationCache) Get() *AttestationConsensusData {
|
||||
return c.a
|
||||
}
|
||||
|
||||
// Put adds a response to the cache. This method is lock free.
|
||||
func (c *AttestationCache) Put(a *AttestationConsensusData) error {
|
||||
if a == nil {
|
||||
return errors.New("attestation cannot be nil")
|
||||
return &AttestationCache{
|
||||
cache: cache.NewFIFO(wrapperToKey),
|
||||
inProgress: make(map[string]bool),
|
||||
}
|
||||
c.a = a
|
||||
}
|
||||
|
||||
// Get waits for any in progress calculation to complete before returning a
|
||||
// cached response, if any.
|
||||
func (c *AttestationCache) Get(ctx context.Context, req *ethpb.AttestationDataRequest) (*ethpb.AttestationData, error) {
|
||||
if req == nil {
|
||||
return nil, errors.New("nil attestation data request")
|
||||
}
|
||||
|
||||
s, e := reqToKey(req)
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
|
||||
delay := minDelay
|
||||
|
||||
// Another identical request may be in progress already. Let's wait until
|
||||
// any in progress request resolves or our timeout is exceeded.
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
c.lock.RLock()
|
||||
if !c.inProgress[s] {
|
||||
c.lock.RUnlock()
|
||||
break
|
||||
}
|
||||
c.lock.RUnlock()
|
||||
|
||||
// This increasing backoff is to decrease the CPU cycles while waiting
|
||||
// for the in progress boolean to flip to false.
|
||||
time.Sleep(time.Duration(delay) * time.Nanosecond)
|
||||
delay *= delayFactor
|
||||
delay = math.Min(delay, maxDelay)
|
||||
}
|
||||
|
||||
item, exists, err := c.cache.GetByKey(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if exists && item != nil && item.(*attestationReqResWrapper).res != nil {
|
||||
attestationCacheHit.Inc()
|
||||
return ethpb.CopyAttestationData(item.(*attestationReqResWrapper).res), nil
|
||||
}
|
||||
attestationCacheMiss.Inc()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// MarkInProgress a request so that any other similar requests will block on
|
||||
// Get until MarkNotInProgress is called.
|
||||
func (c *AttestationCache) MarkInProgress(req *ethpb.AttestationDataRequest) error {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
s, e := reqToKey(req)
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
if c.inProgress[s] {
|
||||
return ErrAlreadyInProgress
|
||||
}
|
||||
c.inProgress[s] = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarkNotInProgress will release the lock on a given request. This should be
|
||||
// called after put.
|
||||
func (c *AttestationCache) MarkNotInProgress(req *ethpb.AttestationDataRequest) error {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
s, e := reqToKey(req)
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
delete(c.inProgress, s)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Put the response in the cache.
|
||||
func (c *AttestationCache) Put(_ context.Context, req *ethpb.AttestationDataRequest, res *ethpb.AttestationData) error {
|
||||
data := &attestationReqResWrapper{
|
||||
req,
|
||||
res,
|
||||
}
|
||||
if err := c.cache.AddIfNotPresent(data); err != nil {
|
||||
return err
|
||||
}
|
||||
trim(c.cache, maxCacheSize)
|
||||
|
||||
attestationCacheSize.Set(float64(len(c.cache.List())))
|
||||
return nil
|
||||
}
|
||||
|
||||
func wrapperToKey(i interface{}) (string, error) {
|
||||
w, ok := i.(*attestationReqResWrapper)
|
||||
if !ok {
|
||||
return "", errors.New("key is not of type *attestationReqResWrapper")
|
||||
}
|
||||
if w == nil {
|
||||
return "", errors.New("nil wrapper")
|
||||
}
|
||||
if w.req == nil {
|
||||
return "", errors.New("nil wrapper.request")
|
||||
}
|
||||
return reqToKey(w.req)
|
||||
}
|
||||
|
||||
func reqToKey(req *ethpb.AttestationDataRequest) (string, error) {
|
||||
return fmt.Sprintf("%d", req.Slot), nil
|
||||
}
|
||||
|
||||
type attestationReqResWrapper struct {
|
||||
req *ethpb.AttestationDataRequest
|
||||
res *ethpb.AttestationData
|
||||
}
|
||||
|
||||
64
beacon-chain/cache/attestation_data_test.go
vendored
64
beacon-chain/cache/attestation_data_test.go
vendored
@@ -1,53 +1,41 @@
|
||||
package cache_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache"
|
||||
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
|
||||
"github.com/stretchr/testify/require"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/assert"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func TestAttestationCache_RoundTrip(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
c := cache.NewAttestationCache()
|
||||
|
||||
a := c.Get()
|
||||
require.Nil(t, a)
|
||||
|
||||
insert := &cache.AttestationConsensusData{
|
||||
Slot: 1,
|
||||
HeadRoot: []byte{1},
|
||||
Target: forkchoicetypes.Checkpoint{
|
||||
Epoch: 2,
|
||||
Root: [32]byte{3},
|
||||
},
|
||||
Source: forkchoicetypes.Checkpoint{
|
||||
Epoch: 4,
|
||||
Root: [32]byte{5},
|
||||
},
|
||||
}
|
||||
err := c.Put(insert)
|
||||
require.NoError(t, err)
|
||||
|
||||
a = c.Get()
|
||||
require.Equal(t, insert, a)
|
||||
|
||||
insert = &cache.AttestationConsensusData{
|
||||
Slot: 6,
|
||||
HeadRoot: []byte{7},
|
||||
Target: forkchoicetypes.Checkpoint{
|
||||
Epoch: 8,
|
||||
Root: [32]byte{9},
|
||||
},
|
||||
Source: forkchoicetypes.Checkpoint{
|
||||
Epoch: 10,
|
||||
Root: [32]byte{11},
|
||||
},
|
||||
req := ðpb.AttestationDataRequest{
|
||||
CommitteeIndex: 0,
|
||||
Slot: 1,
|
||||
}
|
||||
|
||||
err = c.Put(insert)
|
||||
require.NoError(t, err)
|
||||
response, err := c.Get(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, (*ethpb.AttestationData)(nil), response)
|
||||
|
||||
a = c.Get()
|
||||
require.Equal(t, insert, a)
|
||||
assert.NoError(t, c.MarkInProgress(req))
|
||||
|
||||
res := ðpb.AttestationData{
|
||||
Target: ðpb.Checkpoint{Epoch: 5, Root: make([]byte, 32)},
|
||||
}
|
||||
|
||||
assert.NoError(t, c.Put(ctx, req, res))
|
||||
assert.NoError(t, c.MarkNotInProgress(req))
|
||||
|
||||
response, err = c.Get(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
if !proto.Equal(response, res) {
|
||||
t.Error("Expected equal protos to return from cache")
|
||||
}
|
||||
}
|
||||
|
||||
7
beacon-chain/cache/common.go
vendored
7
beacon-chain/cache/common.go
vendored
@@ -1,9 +1,16 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
var (
|
||||
// maxCacheSize is 4x of the epoch length for additional cache padding.
|
||||
// Requests should be only accessing committees within defined epoch length.
|
||||
maxCacheSize = uint64(4 * params.BeaconConfig().SlotsPerEpoch)
|
||||
)
|
||||
|
||||
// trim the FIFO queue to the maxSize.
|
||||
func trim(queue *cache.FIFO, maxSize uint64) {
|
||||
for s := uint64(len(queue.ListKeys())); s > maxSize; s-- {
|
||||
|
||||
5
beacon-chain/cache/error.go
vendored
5
beacon-chain/cache/error.go
vendored
@@ -14,9 +14,4 @@ var (
|
||||
errNotSyncCommitteeIndexPosition = errors.New("not syncCommitteeIndexPosition struct")
|
||||
// ErrNotFoundRegistration when validator registration does not exist in cache.
|
||||
ErrNotFoundRegistration = errors.Wrap(ErrNotFound, "no validator registered")
|
||||
|
||||
// ErrAlreadyInProgress appears when attempting to mark a cache as in progress while it is
|
||||
// already in progress. The client should handle this error and wait for the in progress
|
||||
// data to resolve via Get.
|
||||
ErrAlreadyInProgress = errors.New("already in progress")
|
||||
)
|
||||
|
||||
5
beacon-chain/cache/skip_slot_cache.go
vendored
5
beacon-chain/cache/skip_slot_cache.go
vendored
@@ -15,11 +15,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// Delay parameters
|
||||
minDelay = float64(10) // 10 nanoseconds
|
||||
maxDelay = float64(100000000) // 0.1 second
|
||||
delayFactor = 1.1
|
||||
|
||||
// Metrics
|
||||
skipSlotCacheHit = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "skip_slot_cache_hit",
|
||||
|
||||
@@ -20,7 +20,6 @@ go_library(
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/time:go_default_library",
|
||||
"//beacon-chain/core/transition:go_default_library",
|
||||
"//beacon-chain/forkchoice/types:go_default_library",
|
||||
"//beacon-chain/operations/synccommittee:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
|
||||
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
|
||||
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
|
||||
beaconState "github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
@@ -315,9 +314,6 @@ func (s *Service) AggregatedSigAndAggregationBits(
|
||||
func (s *Service) GetAttestationData(
|
||||
ctx context.Context, req *ethpb.AttestationDataRequest,
|
||||
) (*ethpb.AttestationData, *RpcError) {
|
||||
if req.Slot != s.GenesisTimeFetcher.CurrentSlot() {
|
||||
return nil, &RpcError{Reason: BadRequest, Err: errors.Errorf("invalid request: slot %d is not the current slot %d", req.Slot, s.GenesisTimeFetcher.CurrentSlot())}
|
||||
}
|
||||
if err := helpers.ValidateAttestationTime(
|
||||
req.Slot,
|
||||
s.GenesisTimeFetcher.GenesisTime(),
|
||||
@@ -326,49 +322,35 @@ func (s *Service) GetAttestationData(
|
||||
return nil, &RpcError{Reason: BadRequest, Err: errors.Errorf("invalid request: %v", err)}
|
||||
}
|
||||
|
||||
s.AttestationCache.RLock()
|
||||
res := s.AttestationCache.Get()
|
||||
if res != nil && res.Slot == req.Slot {
|
||||
s.AttestationCache.RUnlock()
|
||||
return ðpb.AttestationData{
|
||||
Slot: res.Slot,
|
||||
CommitteeIndex: req.CommitteeIndex,
|
||||
BeaconBlockRoot: res.HeadRoot,
|
||||
Source: ðpb.Checkpoint{
|
||||
Epoch: res.Source.Epoch,
|
||||
Root: res.Source.Root[:],
|
||||
},
|
||||
Target: ðpb.Checkpoint{
|
||||
Epoch: res.Target.Epoch,
|
||||
Root: res.Target.Root[:],
|
||||
},
|
||||
}, nil
|
||||
res, err := s.AttestationCache.Get(ctx, req)
|
||||
if err != nil {
|
||||
return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not retrieve data from attestation cache: %v", err)}
|
||||
}
|
||||
s.AttestationCache.RUnlock()
|
||||
|
||||
s.AttestationCache.Lock()
|
||||
defer s.AttestationCache.Unlock()
|
||||
|
||||
// We check the cache again as in the event there are multiple inflight requests for
|
||||
// the same attestation data, the cache might have been filled while we were waiting
|
||||
// to acquire the lock.
|
||||
res = s.AttestationCache.Get()
|
||||
if res != nil && res.Slot == req.Slot {
|
||||
return ðpb.AttestationData{
|
||||
Slot: res.Slot,
|
||||
CommitteeIndex: req.CommitteeIndex,
|
||||
BeaconBlockRoot: res.HeadRoot,
|
||||
Source: ðpb.Checkpoint{
|
||||
Epoch: res.Source.Epoch,
|
||||
Root: res.Source.Root[:],
|
||||
},
|
||||
Target: ðpb.Checkpoint{
|
||||
Epoch: res.Target.Epoch,
|
||||
Root: res.Target.Root[:],
|
||||
},
|
||||
}, nil
|
||||
if res != nil {
|
||||
res.CommitteeIndex = req.CommitteeIndex
|
||||
return res, nil
|
||||
}
|
||||
|
||||
if err := s.AttestationCache.MarkInProgress(req); err != nil {
|
||||
if errors.Is(err, cache.ErrAlreadyInProgress) {
|
||||
res, err := s.AttestationCache.Get(ctx, req)
|
||||
if err != nil {
|
||||
return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not retrieve data from attestation cache: %v", err)}
|
||||
}
|
||||
if res == nil {
|
||||
return nil, &RpcError{Reason: Internal, Err: errors.New("a request was in progress and resolved to nil")}
|
||||
}
|
||||
res.CommitteeIndex = req.CommitteeIndex
|
||||
return res, nil
|
||||
}
|
||||
return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not mark attestation as in-progress: %v", err)}
|
||||
}
|
||||
defer func() {
|
||||
if err := s.AttestationCache.MarkNotInProgress(req); err != nil {
|
||||
log.WithError(err).Error("could not mark attestation as not-in-progress")
|
||||
}
|
||||
}()
|
||||
|
||||
headRoot, err := s.HeadFetcher.HeadRoot(ctx)
|
||||
if err != nil {
|
||||
return nil, &RpcError{Reason: Internal, Err: errors.Wrap(err, "could not get head root")}
|
||||
@@ -379,34 +361,25 @@ func (s *Service) GetAttestationData(
|
||||
return nil, &RpcError{Reason: Internal, Err: errors.Wrap(err, "could not get target root")}
|
||||
}
|
||||
justifiedCheckpoint := s.FinalizedFetcher.CurrentJustifiedCheckpt()
|
||||
if err = s.AttestationCache.Put(&cache.AttestationConsensusData{
|
||||
Slot: req.Slot,
|
||||
HeadRoot: headRoot,
|
||||
Target: forkchoicetypes.Checkpoint{
|
||||
Epoch: targetEpoch,
|
||||
Root: targetRoot,
|
||||
},
|
||||
Source: forkchoicetypes.Checkpoint{
|
||||
Epoch: justifiedCheckpoint.Epoch,
|
||||
Root: bytesutil.ToBytes32(justifiedCheckpoint.Root),
|
||||
},
|
||||
}); err != nil {
|
||||
log.WithError(err).Error("Failed to put attestation data into cache")
|
||||
if justifiedCheckpoint == nil {
|
||||
return nil, &RpcError{Reason: Internal, Err: errors.New("could not get justified checkpoint")}
|
||||
}
|
||||
|
||||
return ðpb.AttestationData{
|
||||
res = ðpb.AttestationData{
|
||||
Slot: req.Slot,
|
||||
CommitteeIndex: req.CommitteeIndex,
|
||||
BeaconBlockRoot: headRoot,
|
||||
Source: ðpb.Checkpoint{
|
||||
Epoch: justifiedCheckpoint.Epoch,
|
||||
Root: justifiedCheckpoint.Root,
|
||||
},
|
||||
Source: justifiedCheckpoint,
|
||||
Target: ðpb.Checkpoint{
|
||||
Epoch: targetEpoch,
|
||||
Root: targetRoot[:],
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
if err := s.AttestationCache.Put(ctx, req, res); err != nil {
|
||||
log.WithError(err).Error("could not store attestation data in cache")
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// SubmitSyncMessage submits the sync committee message to the network.
|
||||
|
||||
@@ -76,6 +76,7 @@ go_test(
|
||||
"//beacon-chain/rpc/eth/shared/testing:go_default_library",
|
||||
"//beacon-chain/rpc/testutil:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
"//beacon-chain/state/state-native:go_default_library",
|
||||
"//beacon-chain/state/stategen:go_default_library",
|
||||
"//beacon-chain/sync/initial-sync/testing:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -29,6 +30,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/testutil"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
|
||||
mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
@@ -811,18 +813,29 @@ func TestGetAttestationData(t *testing.T) {
|
||||
require.NoError(t, err, "Could not hash beacon block")
|
||||
justifiedRoot, err := justifiedBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not get signing root for justified block")
|
||||
targetRoot, err := targetBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not get signing root for target block")
|
||||
slot := 3*params.BeaconConfig().SlotsPerEpoch + 1
|
||||
justifiedCheckpoint := ðpbalpha.Checkpoint{
|
||||
beaconState, err := util.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, beaconState.SetSlot(slot))
|
||||
err = beaconState.SetCurrentJustifiedCheckpoint(ðpbalpha.Checkpoint{
|
||||
Epoch: 2,
|
||||
Root: justifiedRoot[:],
|
||||
}
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
blockRoots := beaconState.BlockRoots()
|
||||
blockRoots[1] = blockRoot[:]
|
||||
blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:]
|
||||
blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:]
|
||||
require.NoError(t, beaconState.SetBlockRoots(blockRoots))
|
||||
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
chain := &mockChain.ChainService{
|
||||
Optimistic: false,
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
Root: blockRoot[:],
|
||||
CurrentJustifiedCheckPoint: justifiedCheckpoint,
|
||||
TargetRoot: blockRoot,
|
||||
Optimistic: false,
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
State: beaconState,
|
||||
Root: blockRoot[:],
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
@@ -831,10 +844,9 @@ func TestGetAttestationData(t *testing.T) {
|
||||
TimeFetcher: chain,
|
||||
OptimisticModeFetcher: chain,
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
HeadFetcher: chain,
|
||||
GenesisTimeFetcher: chain,
|
||||
FinalizedFetcher: chain,
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -902,10 +914,9 @@ func TestGetAttestationData(t *testing.T) {
|
||||
beaconState, err := util.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
chain := &mockChain.ChainService{
|
||||
Optimistic: true,
|
||||
State: beaconState,
|
||||
Genesis: time.Now(),
|
||||
CurrentJustifiedCheckPoint: ðpbalpha.Checkpoint{},
|
||||
Optimistic: true,
|
||||
State: beaconState,
|
||||
Genesis: time.Now(),
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
@@ -917,7 +928,6 @@ func TestGetAttestationData(t *testing.T) {
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
GenesisTimeFetcher: chain,
|
||||
HeadFetcher: chain,
|
||||
FinalizedFetcher: chain,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -944,13 +954,97 @@ func TestGetAttestationData(t *testing.T) {
|
||||
assert.Equal(t, http.StatusOK, writer.Code)
|
||||
})
|
||||
|
||||
t.Run("handles in progress request", func(t *testing.T) {
|
||||
state, err := state_native.InitializeFromProtoPhase0(ðpbalpha.BeaconState{Slot: 100})
|
||||
require.NoError(t, err)
|
||||
ctx := context.Background()
|
||||
slot := primitives.Slot(2)
|
||||
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
chain := &mockChain.ChainService{
|
||||
Optimistic: false,
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
State: state,
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
HeadFetcher: chain,
|
||||
TimeFetcher: chain,
|
||||
OptimisticModeFetcher: chain,
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
HeadFetcher: chain,
|
||||
GenesisTimeFetcher: chain,
|
||||
},
|
||||
}
|
||||
|
||||
expectedResponse := &GetAttestationDataResponse{
|
||||
Data: &shared.AttestationData{
|
||||
Slot: strconv.FormatUint(uint64(slot), 10),
|
||||
CommitteeIndex: strconv.FormatUint(1, 10),
|
||||
BeaconBlockRoot: hexutil.Encode(make([]byte, 32)),
|
||||
Source: &shared.Checkpoint{
|
||||
Epoch: strconv.FormatUint(42, 10),
|
||||
Root: hexutil.Encode(make([]byte, 32)),
|
||||
},
|
||||
Target: &shared.Checkpoint{
|
||||
Epoch: strconv.FormatUint(55, 10),
|
||||
Root: hexutil.Encode(make([]byte, 32)),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
expectedResponsePb := ðpbalpha.AttestationData{
|
||||
Slot: slot,
|
||||
CommitteeIndex: 1,
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Source: ðpbalpha.Checkpoint{Epoch: 42, Root: make([]byte, 32)},
|
||||
Target: ðpbalpha.Checkpoint{Epoch: 55, Root: make([]byte, 32)},
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("http://example.com?slot=%d&committee_index=%d", slot, 1)
|
||||
request := httptest.NewRequest(http.MethodGet, url, nil)
|
||||
writer := httptest.NewRecorder()
|
||||
writer.Body = &bytes.Buffer{}
|
||||
|
||||
requestPb := ðpbalpha.AttestationDataRequest{
|
||||
CommitteeIndex: 1,
|
||||
Slot: slot,
|
||||
}
|
||||
|
||||
require.NoError(t, s.CoreService.AttestationCache.MarkInProgress(requestPb))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.GetAttestationData(writer, request)
|
||||
|
||||
assert.Equal(t, http.StatusOK, writer.Code)
|
||||
resp := &GetAttestationDataResponse{}
|
||||
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp))
|
||||
require.NotNil(t, resp)
|
||||
assert.DeepEqual(t, expectedResponse, resp)
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
assert.NoError(t, s.CoreService.AttestationCache.Put(ctx, requestPb, expectedResponsePb))
|
||||
assert.NoError(t, s.CoreService.AttestationCache.MarkNotInProgress(requestPb))
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
t.Run("invalid slot", func(t *testing.T) {
|
||||
slot := 3*params.BeaconConfig().SlotsPerEpoch + 1
|
||||
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
chain := &mockChain.ChainService{
|
||||
Optimistic: false,
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
CurrentJustifiedCheckPoint: ðpbalpha.Checkpoint{},
|
||||
Optimistic: false,
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
@@ -960,7 +1054,6 @@ func TestGetAttestationData(t *testing.T) {
|
||||
OptimisticModeFetcher: chain,
|
||||
CoreService: &core.Service{
|
||||
GenesisTimeFetcher: chain,
|
||||
FinalizedFetcher: chain,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -978,7 +1071,7 @@ func TestGetAttestationData(t *testing.T) {
|
||||
assert.Equal(t, true, strings.Contains(e.Message, "invalid request"))
|
||||
})
|
||||
|
||||
t.Run("request slot is not current slot", func(t *testing.T) {
|
||||
t.Run("head state slot greater than request slot", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
db := dbutil.SetupDB(t)
|
||||
|
||||
@@ -998,17 +1091,37 @@ func TestGetAttestationData(t *testing.T) {
|
||||
util.SaveBlock(t, ctx, db, block2)
|
||||
justifiedRoot, err := justifiedBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not get signing root for justified block")
|
||||
justifiedCheckpoint := ðpbalpha.Checkpoint{
|
||||
targetRoot, err := targetBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not get signing root for target block")
|
||||
|
||||
beaconState, err := util.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, beaconState.SetSlot(slot))
|
||||
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
require.NoError(t, beaconState.SetGenesisTime(uint64(time.Now().Unix()-offset)))
|
||||
err = beaconState.SetLatestBlockHeader(util.HydrateBeaconHeader(ðpbalpha.BeaconBlockHeader{
|
||||
ParentRoot: blockRoot2[:],
|
||||
}))
|
||||
require.NoError(t, err)
|
||||
err = beaconState.SetCurrentJustifiedCheckpoint(ðpbalpha.Checkpoint{
|
||||
Epoch: 2,
|
||||
Root: justifiedRoot[:],
|
||||
}
|
||||
})
|
||||
require.NoError(t, err)
|
||||
blockRoots := beaconState.BlockRoots()
|
||||
blockRoots[1] = blockRoot[:]
|
||||
blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:]
|
||||
blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:]
|
||||
blockRoots[3*params.BeaconConfig().SlotsPerEpoch] = blockRoot2[:]
|
||||
require.NoError(t, beaconState.SetBlockRoots(blockRoots))
|
||||
|
||||
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
beaconstate := beaconState.Copy()
|
||||
require.NoError(t, beaconstate.SetSlot(beaconstate.Slot()-1))
|
||||
require.NoError(t, db.SaveState(ctx, beaconstate, blockRoot2))
|
||||
chain := &mockChain.ChainService{
|
||||
Root: blockRoot[:],
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
CurrentJustifiedCheckPoint: justifiedCheckpoint,
|
||||
TargetRoot: blockRoot2,
|
||||
State: beaconState,
|
||||
Root: blockRoot[:],
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
@@ -1017,20 +1130,45 @@ func TestGetAttestationData(t *testing.T) {
|
||||
TimeFetcher: chain,
|
||||
OptimisticModeFetcher: chain,
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
HeadFetcher: chain,
|
||||
GenesisTimeFetcher: chain,
|
||||
StateGen: stategen.New(db, doublylinkedtree.New()),
|
||||
FinalizedFetcher: chain,
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(t, db.SaveState(ctx, beaconState, blockRoot))
|
||||
util.SaveBlock(t, ctx, db, block)
|
||||
require.NoError(t, db.SaveHeadBlockRoot(ctx, blockRoot))
|
||||
|
||||
url := fmt.Sprintf("http://example.com?slot=%d&committee_index=%d", slot-1, 0)
|
||||
request := httptest.NewRequest(http.MethodGet, url, nil)
|
||||
writer := httptest.NewRecorder()
|
||||
writer.Body = &bytes.Buffer{}
|
||||
|
||||
s.GetAttestationData(writer, request)
|
||||
assert.Equal(t, http.StatusBadRequest, writer.Code)
|
||||
|
||||
expectedResponse := &GetAttestationDataResponse{
|
||||
Data: &shared.AttestationData{
|
||||
Slot: strconv.FormatUint(uint64(slot-1), 10),
|
||||
CommitteeIndex: strconv.FormatUint(0, 10),
|
||||
BeaconBlockRoot: hexutil.Encode(blockRoot2[:]),
|
||||
Source: &shared.Checkpoint{
|
||||
Epoch: strconv.FormatUint(2, 10),
|
||||
Root: hexutil.Encode(justifiedRoot[:]),
|
||||
},
|
||||
Target: &shared.Checkpoint{
|
||||
Epoch: strconv.FormatUint(3, 10),
|
||||
Root: hexutil.Encode(blockRoot2[:]),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
assert.Equal(t, http.StatusOK, writer.Code)
|
||||
resp := &GetAttestationDataResponse{}
|
||||
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp))
|
||||
require.NotNil(t, resp)
|
||||
assert.DeepEqual(t, expectedResponse, resp)
|
||||
})
|
||||
|
||||
t.Run("succeeds in first epoch", func(t *testing.T) {
|
||||
@@ -1045,18 +1183,27 @@ func TestGetAttestationData(t *testing.T) {
|
||||
require.NoError(t, err, "Could not hash beacon block")
|
||||
justifiedRoot, err := justifiedBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not get signing root for justified block")
|
||||
targetRoot, err := targetBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not get signing root for target block")
|
||||
|
||||
justifiedCheckpt := ðpbalpha.Checkpoint{
|
||||
beaconState, err := util.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, beaconState.SetSlot(slot))
|
||||
err = beaconState.SetCurrentJustifiedCheckpoint(ðpbalpha.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: justifiedRoot[:],
|
||||
}
|
||||
})
|
||||
require.NoError(t, err)
|
||||
blockRoots := beaconState.BlockRoots()
|
||||
blockRoots[1] = blockRoot[:]
|
||||
blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:]
|
||||
blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:]
|
||||
require.NoError(t, beaconState.SetBlockRoots(blockRoots))
|
||||
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
chain := &mockChain.ChainService{
|
||||
Root: blockRoot[:],
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
CurrentJustifiedCheckPoint: justifiedCheckpt,
|
||||
TargetRoot: blockRoot,
|
||||
State: beaconState,
|
||||
Root: blockRoot[:],
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
@@ -1068,7 +1215,6 @@ func TestGetAttestationData(t *testing.T) {
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
HeadFetcher: chain,
|
||||
GenesisTimeFetcher: chain,
|
||||
FinalizedFetcher: chain,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1132,17 +1278,28 @@ func TestGetAttestationData(t *testing.T) {
|
||||
require.NoError(t, err, "Could not hash beacon block")
|
||||
justifiedBlockRoot, err := justifiedBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not hash justified block")
|
||||
justifiedCheckpt := ðpbalpha.Checkpoint{
|
||||
epochBoundaryRoot, err := epochBoundaryBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not hash justified block")
|
||||
slot := primitives.Slot(10000)
|
||||
|
||||
beaconState, err := util.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, beaconState.SetSlot(slot))
|
||||
err = beaconState.SetCurrentJustifiedCheckpoint(ðpbalpha.Checkpoint{
|
||||
Epoch: slots.ToEpoch(1500),
|
||||
Root: justifiedBlockRoot[:],
|
||||
}
|
||||
slot := primitives.Slot(10000)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
blockRoots := beaconState.BlockRoots()
|
||||
blockRoots[1] = blockRoot[:]
|
||||
blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = epochBoundaryRoot[:]
|
||||
blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedBlockRoot[:]
|
||||
require.NoError(t, beaconState.SetBlockRoots(blockRoots))
|
||||
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
chain := &mockChain.ChainService{
|
||||
Root: blockRoot[:],
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
CurrentJustifiedCheckPoint: justifiedCheckpt,
|
||||
TargetRoot: blockRoot,
|
||||
State: beaconState,
|
||||
Root: blockRoot[:],
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
@@ -1154,7 +1311,6 @@ func TestGetAttestationData(t *testing.T) {
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
HeadFetcher: chain,
|
||||
GenesisTimeFetcher: chain,
|
||||
FinalizedFetcher: chain,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -48,13 +48,23 @@ func TestAttestationDataAtSlot_HandlesFarAwayJustifiedEpoch(t *testing.T) {
|
||||
require.NoError(t, err, "Could not hash beacon block")
|
||||
justifiedBlockRoot, err := justifiedBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not hash justified block")
|
||||
epochBoundaryRoot, err := epochBoundaryBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not hash justified block")
|
||||
slot := primitives.Slot(10000)
|
||||
|
||||
justifiedCheckpoint := ðpb.Checkpoint{
|
||||
beaconState, err := util.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, beaconState.SetSlot(slot))
|
||||
err = beaconState.SetCurrentJustifiedCheckpoint(ðpb.Checkpoint{
|
||||
Epoch: slots.ToEpoch(1500),
|
||||
Root: justifiedBlockRoot[:],
|
||||
}
|
||||
})
|
||||
require.NoError(t, err)
|
||||
slot := primitives.Slot(10000)
|
||||
blockRoots := beaconState.BlockRoots()
|
||||
blockRoots[1] = blockRoot[:]
|
||||
blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = epochBoundaryRoot[:]
|
||||
blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedBlockRoot[:]
|
||||
require.NoError(t, beaconState.SetBlockRoots(blockRoots))
|
||||
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
attesterServer := &Server{
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
@@ -62,9 +72,8 @@ func TestAttestationDataAtSlot_HandlesFarAwayJustifiedEpoch(t *testing.T) {
|
||||
TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
HeadFetcher: &mock.ChainService{TargetRoot: blockRoot, Root: blockRoot[:]},
|
||||
HeadFetcher: &mock.ChainService{State: beaconState, Root: blockRoot[:]},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
|
||||
mockp2p "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
|
||||
state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
|
||||
mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
@@ -146,63 +147,6 @@ func TestGetAttestationData_OK(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGetAttestationDataConcurrent(b *testing.B) {
|
||||
block := util.NewBeaconBlock()
|
||||
block.Block.Slot = 3*params.BeaconConfig().SlotsPerEpoch + 1
|
||||
targetBlock := util.NewBeaconBlock()
|
||||
targetBlock.Block.Slot = 1 * params.BeaconConfig().SlotsPerEpoch
|
||||
targetRoot, err := targetBlock.Block.HashTreeRoot()
|
||||
require.NoError(b, err, "Could not get signing root for target block")
|
||||
|
||||
justifiedBlock := util.NewBeaconBlock()
|
||||
justifiedBlock.Block.Slot = 2 * params.BeaconConfig().SlotsPerEpoch
|
||||
blockRoot, err := block.Block.HashTreeRoot()
|
||||
require.NoError(b, err, "Could not hash beacon block")
|
||||
justifiedRoot, err := justifiedBlock.Block.HashTreeRoot()
|
||||
require.NoError(b, err, "Could not get signing root for justified block")
|
||||
slot := 3*params.BeaconConfig().SlotsPerEpoch + 1
|
||||
justifiedCheckpoint := ðpb.Checkpoint{
|
||||
Epoch: 2,
|
||||
Root: justifiedRoot[:],
|
||||
}
|
||||
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
attesterServer := &Server{
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
OptimisticModeFetcher: &mock.ChainService{Optimistic: false},
|
||||
TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
HeadFetcher: &mock.ChainService{TargetRoot: targetRoot, Root: blockRoot[:]},
|
||||
GenesisTimeFetcher: &mock.ChainService{
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
},
|
||||
FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint},
|
||||
},
|
||||
}
|
||||
|
||||
req := ðpb.AttestationDataRequest{
|
||||
CommitteeIndex: 0,
|
||||
Slot: 3*params.BeaconConfig().SlotsPerEpoch + 1,
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(5000) // for 5000 concurrent accesses
|
||||
|
||||
for j := 0; j < 5000; j++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := attesterServer.GetAttestationData(context.Background(), req)
|
||||
require.NoError(b, err, "Could not get attestation info at slot")
|
||||
}()
|
||||
}
|
||||
wg.Wait() // Wait for all goroutines to finish
|
||||
}
|
||||
|
||||
b.Log("Elapsed time:", b.Elapsed())
|
||||
}
|
||||
|
||||
func TestGetAttestationData_SyncNotReady(t *testing.T) {
|
||||
as := Server{
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||
@@ -250,6 +194,59 @@ func TestGetAttestationData_Optimistic(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestAttestationDataSlot_handlesInProgressRequest(t *testing.T) {
|
||||
s := ðpb.BeaconState{Slot: 100}
|
||||
state, err := state_native.InitializeFromProtoPhase0(s)
|
||||
require.NoError(t, err)
|
||||
ctx := context.Background()
|
||||
slot := primitives.Slot(2)
|
||||
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
server := &Server{
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
OptimisticModeFetcher: &mock.ChainService{Optimistic: false},
|
||||
TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
HeadFetcher: &mock.ChainService{State: state},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
},
|
||||
}
|
||||
|
||||
req := ðpb.AttestationDataRequest{
|
||||
CommitteeIndex: 1,
|
||||
Slot: slot,
|
||||
}
|
||||
|
||||
res := ðpb.AttestationData{
|
||||
CommitteeIndex: 1,
|
||||
Target: ðpb.Checkpoint{Epoch: 55, Root: make([]byte, 32)},
|
||||
}
|
||||
|
||||
require.NoError(t, server.CoreService.AttestationCache.MarkInProgress(req))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
response, err := server.GetAttestationData(ctx, req)
|
||||
require.NoError(t, err)
|
||||
if !proto.Equal(res, response) {
|
||||
t.Error("Expected equal responses from cache")
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
assert.NoError(t, server.CoreService.AttestationCache.Put(ctx, req, res))
|
||||
assert.NoError(t, server.CoreService.AttestationCache.MarkNotInProgress(req))
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestServer_GetAttestationData_InvalidRequestSlot(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -271,7 +268,11 @@ func TestServer_GetAttestationData_InvalidRequestSlot(t *testing.T) {
|
||||
assert.ErrorContains(t, "invalid request", err)
|
||||
}
|
||||
|
||||
func TestServer_GetAttestationData_RequestSlotIsDifferentThanCurrentSlot(t *testing.T) {
|
||||
func TestServer_GetAttestationData_HeadStateSlotGreaterThanRequestSlot(t *testing.T) {
|
||||
// There exists a rare scenario where the validator may request an attestation for a slot less
|
||||
// than the head state's slot. The Ethereum consensus spec constraints require the block root the
|
||||
// attestation is referencing be less than or equal to the attestation data slot.
|
||||
// See: https://github.com/prysmaticlabs/prysm/issues/5164
|
||||
ctx := context.Background()
|
||||
db := dbutil.SetupDB(t)
|
||||
|
||||
@@ -291,30 +292,77 @@ func TestServer_GetAttestationData_RequestSlotIsDifferentThanCurrentSlot(t *test
|
||||
util.SaveBlock(t, ctx, db, block2)
|
||||
justifiedRoot, err := justifiedBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not get signing root for justified block")
|
||||
targetRoot, err := targetBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not get signing root for target block")
|
||||
|
||||
beaconState, err := util.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, beaconState.SetSlot(slot))
|
||||
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
require.NoError(t, beaconState.SetGenesisTime(uint64(time.Now().Unix()-offset)))
|
||||
err = beaconState.SetLatestBlockHeader(util.HydrateBeaconHeader(ðpb.BeaconBlockHeader{
|
||||
ParentRoot: blockRoot2[:],
|
||||
}))
|
||||
require.NoError(t, err)
|
||||
err = beaconState.SetCurrentJustifiedCheckpoint(ðpb.Checkpoint{
|
||||
Epoch: 2,
|
||||
Root: justifiedRoot[:],
|
||||
})
|
||||
require.NoError(t, err)
|
||||
blockRoots := beaconState.BlockRoots()
|
||||
blockRoots[1] = blockRoot[:]
|
||||
blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:]
|
||||
blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:]
|
||||
blockRoots[3*params.BeaconConfig().SlotsPerEpoch] = blockRoot2[:]
|
||||
require.NoError(t, beaconState.SetBlockRoots(blockRoots))
|
||||
|
||||
beaconstate := beaconState.Copy()
|
||||
require.NoError(t, beaconstate.SetSlot(beaconstate.Slot()-1))
|
||||
require.NoError(t, db.SaveState(ctx, beaconstate, blockRoot2))
|
||||
offset = int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
justifiedCheckpoint := ðpb.Checkpoint{
|
||||
Epoch: 2,
|
||||
Root: justifiedRoot[:],
|
||||
}
|
||||
offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
attesterServer := &Server{
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
OptimisticModeFetcher: &mock.ChainService{Optimistic: false},
|
||||
TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{TargetRoot: blockRoot2, Root: blockRoot[:]},
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
HeadFetcher: &mock.ChainService{State: beaconState, Root: blockRoot[:], TargetRoot: blockRoot2},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
StateGen: stategen.New(db, doublylinkedtree.New()),
|
||||
FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint},
|
||||
},
|
||||
}
|
||||
require.NoError(t, db.SaveState(ctx, beaconState, blockRoot))
|
||||
util.SaveBlock(t, ctx, db, block)
|
||||
require.NoError(t, db.SaveHeadBlockRoot(ctx, blockRoot))
|
||||
|
||||
req := ðpb.AttestationDataRequest{
|
||||
CommitteeIndex: 0,
|
||||
Slot: slot - 1,
|
||||
}
|
||||
_, err = attesterServer.GetAttestationData(ctx, req)
|
||||
require.ErrorContains(t, "invalid request: slot 24 is not the current slot 25", err)
|
||||
res, err := attesterServer.GetAttestationData(ctx, req)
|
||||
require.NoError(t, err, "Could not get attestation info at slot")
|
||||
|
||||
expectedInfo := ðpb.AttestationData{
|
||||
Slot: slot - 1,
|
||||
BeaconBlockRoot: blockRoot[:],
|
||||
Source: ðpb.Checkpoint{
|
||||
Epoch: 2,
|
||||
Root: justifiedRoot[:],
|
||||
},
|
||||
Target: ðpb.Checkpoint{
|
||||
Epoch: 3,
|
||||
Root: blockRoot2[:],
|
||||
},
|
||||
}
|
||||
|
||||
if !proto.Equal(res, expectedInfo) {
|
||||
t.Errorf("Expected attestation info to match, received %v, wanted %v", res, expectedInfo)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAttestationData_SucceedsInFirstEpoch(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user