From 0eff83cb9d14a75b4e04b803c6b987cfbdb658aa Mon Sep 17 00:00:00 2001 From: terence Date: Tue, 19 Dec 2023 00:12:43 +0800 Subject: [PATCH] Use a cache of one entry to build attestation (#13300) * Use a cache of one entry to build attestation * Gazelle * Enforce on RPC side * Rm unused var * Potuz feedback, dont use pointer * Fix tests * Init fetcher * Add in-progress * Add back missing lock * Potuz feedback * Update beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go Co-authored-by: Potuz --------- Co-authored-by: Potuz --- beacon-chain/blockchain/chain_info.go | 1 + beacon-chain/blockchain/testing/mock.go | 6 + beacon-chain/cache/BUILD.bazel | 3 + beacon-chain/cache/attestation_data.go | 172 ++----------- beacon-chain/cache/attestation_data_test.go | 64 +++-- beacon-chain/cache/common.go | 7 - beacon-chain/cache/error.go | 5 + beacon-chain/cache/skip_slot_cache.go | 5 + beacon-chain/rpc/core/BUILD.bazel | 1 + beacon-chain/rpc/core/service.go | 1 + beacon-chain/rpc/core/validator.go | 152 ++++++----- beacon-chain/rpc/eth/validator/BUILD.bazel | 1 - .../rpc/eth/validator/handlers_test.go | 242 ++++-------------- .../validator/attester_mainnet_test.go | 19 +- .../prysm/v1alpha1/validator/attester_test.go | 223 ++++++---------- beacon-chain/rpc/service.go | 1 + 16 files changed, 284 insertions(+), 619 deletions(-) diff --git a/beacon-chain/blockchain/chain_info.go b/beacon-chain/blockchain/chain_info.go index 559aed7565..052e4503b8 100644 --- a/beacon-chain/blockchain/chain_info.go +++ b/beacon-chain/blockchain/chain_info.go @@ -75,6 +75,7 @@ type HeadFetcher interface { HeadPublicKeyToValidatorIndex(pubKey [fieldparams.BLSPubkeyLength]byte) (primitives.ValidatorIndex, bool) HeadValidatorIndexToPublicKey(ctx context.Context, index primitives.ValidatorIndex) ([fieldparams.BLSPubkeyLength]byte, error) ChainHeads() ([][32]byte, []primitives.Slot) + TargetRootForEpoch([32]byte, primitives.Epoch) ([32]byte, error) HeadSyncCommitteeFetcher HeadDomainFetcher } diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index 683dc3673f..c47cf5e057 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -73,6 +73,7 @@ type ChainService struct { BlockSlot primitives.Slot SyncingRoot [32]byte Blobs []blocks.VerifiedROBlob + TargetRoot [32]byte } func (s *ChainService) Ancestor(ctx context.Context, root []byte, slot primitives.Slot) ([]byte, error) { @@ -617,3 +618,8 @@ func (c *ChainService) ReceiveBlob(_ context.Context, b blocks.VerifiedROBlob) e c.Blobs = append(c.Blobs, b) return nil } + +// TargetRootForEpoch mocks the same method in the chain service +func (c *ChainService) TargetRootForEpoch(_ [32]byte, _ primitives.Epoch) ([32]byte, error) { + return c.TargetRoot, nil +} diff --git a/beacon-chain/cache/BUILD.bazel b/beacon-chain/cache/BUILD.bazel index e7d9d0e6cf..39819bce66 100644 --- a/beacon-chain/cache/BUILD.bazel +++ b/beacon-chain/cache/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "//tools:__subpackages__", ], deps = [ + "//beacon-chain/forkchoice/types:go_default_library", "//beacon-chain/state:go_default_library", "//cache/lru:go_default_library", "//config/fieldparams:go_default_library", @@ -78,6 +79,7 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//beacon-chain/forkchoice/types:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/state-native:go_default_library", "//config/fieldparams:go_default_library", @@ -90,6 +92,7 @@ go_test( "//testing/util:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_google_gofuzz//:go_default_library", + "@com_github_stretchr_testify//require:go_default_library", "@org_golang_google_protobuf//proto:go_default_library", ], ) diff --git a/beacon-chain/cache/attestation_data.go b/beacon-chain/cache/attestation_data.go index 6f284ee026..6acd425ed2 100644 --- a/beacon-chain/cache/attestation_data.go +++ b/beacon-chain/cache/attestation_data.go @@ -1,171 +1,41 @@ package cache import ( - "context" "errors" - "fmt" - "math" "sync" - "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" - "k8s.io/client-go/tools/cache" + forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" ) -var ( - // Delay parameters - minDelay = float64(10) // 10 nanoseconds - maxDelay = float64(100000000) // 0.1 second - delayFactor = 1.1 +type AttestationConsensusData struct { + Slot primitives.Slot + HeadRoot []byte + Target forkchoicetypes.Checkpoint + Source forkchoicetypes.Checkpoint +} - // Metrics - attestationCacheMiss = promauto.NewCounter(prometheus.CounterOpts{ - Name: "attestation_cache_miss", - Help: "The number of attestation data requests that aren't present in the cache.", - }) - attestationCacheHit = promauto.NewCounter(prometheus.CounterOpts{ - Name: "attestation_cache_hit", - Help: "The number of attestation data requests that are present in the cache.", - }) - attestationCacheSize = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "attestation_cache_size", - Help: "The number of attestation data in the attestations cache", - }) -) - -// ErrAlreadyInProgress appears when attempting to mark a cache as in progress while it is -// already in progress. The client should handle this error and wait for the in progress -// data to resolve via Get. -var ErrAlreadyInProgress = errors.New("already in progress") - -// AttestationCache is used to store the cached results of an AttestationData request. +// AttestationCache stores cached results of AttestationData requests. type AttestationCache struct { - cache *cache.FIFO - lock sync.RWMutex - inProgress map[string]bool + a *AttestationConsensusData + sync.RWMutex } -// NewAttestationCache initializes the map and underlying cache. +// NewAttestationCache creates a new instance of AttestationCache. func NewAttestationCache() *AttestationCache { - return &AttestationCache{ - cache: cache.NewFIFO(wrapperToKey), - inProgress: make(map[string]bool), - } + return &AttestationCache{} } -// Get waits for any in progress calculation to complete before returning a -// cached response, if any. -func (c *AttestationCache) Get(ctx context.Context, req *ethpb.AttestationDataRequest) (*ethpb.AttestationData, error) { - if req == nil { - return nil, errors.New("nil attestation data request") - } - - s, e := reqToKey(req) - if e != nil { - return nil, e - } - - delay := minDelay - - // Another identical request may be in progress already. Let's wait until - // any in progress request resolves or our timeout is exceeded. - for { - if ctx.Err() != nil { - return nil, ctx.Err() - } - - c.lock.RLock() - if !c.inProgress[s] { - c.lock.RUnlock() - break - } - c.lock.RUnlock() - - // This increasing backoff is to decrease the CPU cycles while waiting - // for the in progress boolean to flip to false. - time.Sleep(time.Duration(delay) * time.Nanosecond) - delay *= delayFactor - delay = math.Min(delay, maxDelay) - } - - item, exists, err := c.cache.GetByKey(s) - if err != nil { - return nil, err - } - - if exists && item != nil && item.(*attestationReqResWrapper).res != nil { - attestationCacheHit.Inc() - return ethpb.CopyAttestationData(item.(*attestationReqResWrapper).res), nil - } - attestationCacheMiss.Inc() - return nil, nil +// Get retrieves cached attestation data, recording a cache hit or miss. This method is lock free. +func (c *AttestationCache) Get() *AttestationConsensusData { + return c.a } -// MarkInProgress a request so that any other similar requests will block on -// Get until MarkNotInProgress is called. -func (c *AttestationCache) MarkInProgress(req *ethpb.AttestationDataRequest) error { - c.lock.Lock() - defer c.lock.Unlock() - s, e := reqToKey(req) - if e != nil { - return e +// Put adds a response to the cache. This method is lock free. +func (c *AttestationCache) Put(a *AttestationConsensusData) error { + if a == nil { + return errors.New("attestation cannot be nil") } - if c.inProgress[s] { - return ErrAlreadyInProgress - } - c.inProgress[s] = true + c.a = a return nil } - -// MarkNotInProgress will release the lock on a given request. This should be -// called after put. -func (c *AttestationCache) MarkNotInProgress(req *ethpb.AttestationDataRequest) error { - c.lock.Lock() - defer c.lock.Unlock() - s, e := reqToKey(req) - if e != nil { - return e - } - delete(c.inProgress, s) - return nil -} - -// Put the response in the cache. -func (c *AttestationCache) Put(_ context.Context, req *ethpb.AttestationDataRequest, res *ethpb.AttestationData) error { - data := &attestationReqResWrapper{ - req, - res, - } - if err := c.cache.AddIfNotPresent(data); err != nil { - return err - } - trim(c.cache, maxCacheSize) - - attestationCacheSize.Set(float64(len(c.cache.List()))) - return nil -} - -func wrapperToKey(i interface{}) (string, error) { - w, ok := i.(*attestationReqResWrapper) - if !ok { - return "", errors.New("key is not of type *attestationReqResWrapper") - } - if w == nil { - return "", errors.New("nil wrapper") - } - if w.req == nil { - return "", errors.New("nil wrapper.request") - } - return reqToKey(w.req) -} - -func reqToKey(req *ethpb.AttestationDataRequest) (string, error) { - return fmt.Sprintf("%d", req.Slot), nil -} - -type attestationReqResWrapper struct { - req *ethpb.AttestationDataRequest - res *ethpb.AttestationData -} diff --git a/beacon-chain/cache/attestation_data_test.go b/beacon-chain/cache/attestation_data_test.go index 65c399d860..c96d93f72e 100644 --- a/beacon-chain/cache/attestation_data_test.go +++ b/beacon-chain/cache/attestation_data_test.go @@ -1,41 +1,53 @@ package cache_test import ( - "context" "testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache" - ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v4/testing/assert" - "google.golang.org/protobuf/proto" + forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types" + "github.com/stretchr/testify/require" ) func TestAttestationCache_RoundTrip(t *testing.T) { - ctx := context.Background() c := cache.NewAttestationCache() - req := ðpb.AttestationDataRequest{ - CommitteeIndex: 0, - Slot: 1, + a := c.Get() + require.Nil(t, a) + + insert := &cache.AttestationConsensusData{ + Slot: 1, + HeadRoot: []byte{1}, + Target: forkchoicetypes.Checkpoint{ + Epoch: 2, + Root: [32]byte{3}, + }, + Source: forkchoicetypes.Checkpoint{ + Epoch: 4, + Root: [32]byte{5}, + }, + } + err := c.Put(insert) + require.NoError(t, err) + + a = c.Get() + require.Equal(t, insert, a) + + insert = &cache.AttestationConsensusData{ + Slot: 6, + HeadRoot: []byte{7}, + Target: forkchoicetypes.Checkpoint{ + Epoch: 8, + Root: [32]byte{9}, + }, + Source: forkchoicetypes.Checkpoint{ + Epoch: 10, + Root: [32]byte{11}, + }, } - response, err := c.Get(ctx, req) - assert.NoError(t, err) - assert.Equal(t, (*ethpb.AttestationData)(nil), response) + err = c.Put(insert) + require.NoError(t, err) - assert.NoError(t, c.MarkInProgress(req)) - - res := ðpb.AttestationData{ - Target: ðpb.Checkpoint{Epoch: 5, Root: make([]byte, 32)}, - } - - assert.NoError(t, c.Put(ctx, req, res)) - assert.NoError(t, c.MarkNotInProgress(req)) - - response, err = c.Get(ctx, req) - assert.NoError(t, err) - - if !proto.Equal(response, res) { - t.Error("Expected equal protos to return from cache") - } + a = c.Get() + require.Equal(t, insert, a) } diff --git a/beacon-chain/cache/common.go b/beacon-chain/cache/common.go index 86f10b41fb..be1950c165 100644 --- a/beacon-chain/cache/common.go +++ b/beacon-chain/cache/common.go @@ -1,16 +1,9 @@ package cache import ( - "github.com/prysmaticlabs/prysm/v4/config/params" "k8s.io/client-go/tools/cache" ) -var ( - // maxCacheSize is 4x of the epoch length for additional cache padding. - // Requests should be only accessing committees within defined epoch length. - maxCacheSize = uint64(4 * params.BeaconConfig().SlotsPerEpoch) -) - // trim the FIFO queue to the maxSize. func trim(queue *cache.FIFO, maxSize uint64) { for s := uint64(len(queue.ListKeys())); s > maxSize; s-- { diff --git a/beacon-chain/cache/error.go b/beacon-chain/cache/error.go index 2027273dbf..3805f959f7 100644 --- a/beacon-chain/cache/error.go +++ b/beacon-chain/cache/error.go @@ -14,4 +14,9 @@ var ( errNotSyncCommitteeIndexPosition = errors.New("not syncCommitteeIndexPosition struct") // ErrNotFoundRegistration when validator registration does not exist in cache. ErrNotFoundRegistration = errors.Wrap(ErrNotFound, "no validator registered") + + // ErrAlreadyInProgress appears when attempting to mark a cache as in progress while it is + // already in progress. The client should handle this error and wait for the in progress + // data to resolve via Get. + ErrAlreadyInProgress = errors.New("already in progress") ) diff --git a/beacon-chain/cache/skip_slot_cache.go b/beacon-chain/cache/skip_slot_cache.go index 5347940a81..5c2289e927 100644 --- a/beacon-chain/cache/skip_slot_cache.go +++ b/beacon-chain/cache/skip_slot_cache.go @@ -15,6 +15,11 @@ import ( ) var ( + // Delay parameters + minDelay = float64(10) // 10 nanoseconds + maxDelay = float64(100000000) // 0.1 second + delayFactor = 1.1 + // Metrics skipSlotCacheHit = promauto.NewCounter(prometheus.CounterOpts{ Name: "skip_slot_cache_hit", diff --git a/beacon-chain/rpc/core/BUILD.bazel b/beacon-chain/rpc/core/BUILD.bazel index eab9b31241..cdca729eb6 100644 --- a/beacon-chain/rpc/core/BUILD.bazel +++ b/beacon-chain/rpc/core/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/core/transition:go_default_library", + "//beacon-chain/forkchoice/types:go_default_library", "//beacon-chain/operations/synccommittee:go_default_library", "//beacon-chain/p2p:go_default_library", "//beacon-chain/state:go_default_library", diff --git a/beacon-chain/rpc/core/service.go b/beacon-chain/rpc/core/service.go index 56a051c551..aba5248998 100644 --- a/beacon-chain/rpc/core/service.go +++ b/beacon-chain/rpc/core/service.go @@ -12,6 +12,7 @@ import ( type Service struct { HeadFetcher blockchain.HeadFetcher + FinalizedFetcher blockchain.FinalizationFetcher GenesisTimeFetcher blockchain.TimeFetcher SyncChecker sync.Checker Broadcaster p2p.Broadcaster diff --git a/beacon-chain/rpc/core/validator.go b/beacon-chain/rpc/core/validator.go index dd766d6997..0ba7960728 100644 --- a/beacon-chain/rpc/core/validator.go +++ b/beacon-chain/rpc/core/validator.go @@ -16,6 +16,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" + forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types" beaconState "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" @@ -314,6 +315,9 @@ func (s *Service) AggregatedSigAndAggregationBits( func (s *Service) GetAttestationData( ctx context.Context, req *ethpb.AttestationDataRequest, ) (*ethpb.AttestationData, *RpcError) { + if req.Slot != s.GenesisTimeFetcher.CurrentSlot() { + return nil, &RpcError{Reason: BadRequest, Err: errors.Errorf("invalid request: slot %d is not the current slot %d", req.Slot, s.GenesisTimeFetcher.CurrentSlot())} + } if err := helpers.ValidateAttestationTime( req.Slot, s.GenesisTimeFetcher.GenesisTime(), @@ -322,99 +326,87 @@ func (s *Service) GetAttestationData( return nil, &RpcError{Reason: BadRequest, Err: errors.Errorf("invalid request: %v", err)} } - res, err := s.AttestationCache.Get(ctx, req) - if err != nil { - return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not retrieve data from attestation cache: %v", err)} + s.AttestationCache.RLock() + res := s.AttestationCache.Get() + if res != nil && res.Slot == req.Slot { + s.AttestationCache.RUnlock() + return ðpb.AttestationData{ + Slot: res.Slot, + CommitteeIndex: req.CommitteeIndex, + BeaconBlockRoot: res.HeadRoot, + Source: ðpb.Checkpoint{ + Epoch: res.Source.Epoch, + Root: res.Source.Root[:], + }, + Target: ðpb.Checkpoint{ + Epoch: res.Target.Epoch, + Root: res.Target.Root[:], + }, + }, nil } - if res != nil { - res.CommitteeIndex = req.CommitteeIndex - return res, nil + s.AttestationCache.RUnlock() + + s.AttestationCache.Lock() + defer s.AttestationCache.Unlock() + + // We check the cache again as in the event there are multiple inflight requests for + // the same attestation data, the cache might have been filled while we were waiting + // to acquire the lock. + res = s.AttestationCache.Get() + if res != nil && res.Slot == req.Slot { + return ðpb.AttestationData{ + Slot: res.Slot, + CommitteeIndex: req.CommitteeIndex, + BeaconBlockRoot: res.HeadRoot, + Source: ðpb.Checkpoint{ + Epoch: res.Source.Epoch, + Root: res.Source.Root[:], + }, + Target: ðpb.Checkpoint{ + Epoch: res.Target.Epoch, + Root: res.Target.Root[:], + }, + }, nil } - if err := s.AttestationCache.MarkInProgress(req); err != nil { - if errors.Is(err, cache.ErrAlreadyInProgress) { - res, err := s.AttestationCache.Get(ctx, req) - if err != nil { - return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not retrieve data from attestation cache: %v", err)} - } - if res == nil { - return nil, &RpcError{Reason: Internal, Err: errors.New("a request was in progress and resolved to nil")} - } - res.CommitteeIndex = req.CommitteeIndex - return res, nil - } - return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not mark attestation as in-progress: %v", err)} - } - defer func() { - if err := s.AttestationCache.MarkNotInProgress(req); err != nil { - log.WithError(err).Error("could not mark attestation as not-in-progress") - } - }() - - headState, err := s.HeadFetcher.HeadState(ctx) - if err != nil { - return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not retrieve head state: %v", err)} - } headRoot, err := s.HeadFetcher.HeadRoot(ctx) if err != nil { - return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not retrieve head root: %v", err)} + return nil, &RpcError{Reason: Internal, Err: errors.Wrap(err, "could not get head root")} } - - // In the case that we receive an attestation request after a newer state/block has been processed. - if headState.Slot() > req.Slot { - headRoot, err = helpers.BlockRootAtSlot(headState, req.Slot) - if err != nil { - return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not get historical head root: %v", err)} - } - headState, err = s.StateGen.StateByRoot(ctx, bytesutil.ToBytes32(headRoot)) - if err != nil { - return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not get historical head state: %v", err)} - } - } - if headState == nil || headState.IsNil() { - return nil, &RpcError{Reason: Internal, Err: errors.New("could not lookup parent state from head")} - } - - if coreTime.CurrentEpoch(headState) < slots.ToEpoch(req.Slot) { - headState, err = transition.ProcessSlotsUsingNextSlotCache(ctx, headState, headRoot, req.Slot) - if err != nil { - return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not process slots up to %d: %v", req.Slot, err)} - } - } - - targetEpoch := coreTime.CurrentEpoch(headState) - epochStartSlot, err := slots.EpochStart(targetEpoch) + targetEpoch := slots.ToEpoch(req.Slot) + targetRoot, err := s.HeadFetcher.TargetRootForEpoch(bytesutil.ToBytes32(headRoot), targetEpoch) if err != nil { - return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not calculate epoch start: %v", err)} + return nil, &RpcError{Reason: Internal, Err: errors.Wrap(err, "could not get target root")} } - var targetRoot []byte - if epochStartSlot == headState.Slot() { - targetRoot = headRoot - } else { - targetRoot, err = helpers.BlockRootAtSlot(headState, epochStartSlot) - if err != nil { - return nil, &RpcError{Reason: Internal, Err: errors.Errorf("could not get target block for slot %d: %v", epochStartSlot, err)} - } - if bytesutil.ToBytes32(targetRoot) == params.BeaconConfig().ZeroHash { - targetRoot = headRoot - } - } - - res = ðpb.AttestationData{ - Slot: req.Slot, - CommitteeIndex: req.CommitteeIndex, - BeaconBlockRoot: headRoot, - Source: headState.CurrentJustifiedCheckpoint(), - Target: ðpb.Checkpoint{ + justifiedCheckpoint := s.FinalizedFetcher.CurrentJustifiedCheckpt() + if err = s.AttestationCache.Put(&cache.AttestationConsensusData{ + Slot: req.Slot, + HeadRoot: headRoot, + Target: forkchoicetypes.Checkpoint{ Epoch: targetEpoch, Root: targetRoot, }, + Source: forkchoicetypes.Checkpoint{ + Epoch: justifiedCheckpoint.Epoch, + Root: bytesutil.ToBytes32(justifiedCheckpoint.Root), + }, + }); err != nil { + log.WithError(err).Error("Failed to put attestation data into cache") } - if err := s.AttestationCache.Put(ctx, req, res); err != nil { - log.WithError(err).Error("could not store attestation data in cache") - } - return res, nil + return ðpb.AttestationData{ + Slot: req.Slot, + CommitteeIndex: req.CommitteeIndex, + BeaconBlockRoot: headRoot, + Source: ðpb.Checkpoint{ + Epoch: justifiedCheckpoint.Epoch, + Root: justifiedCheckpoint.Root, + }, + Target: ðpb.Checkpoint{ + Epoch: targetEpoch, + Root: targetRoot[:], + }, + }, nil } // SubmitSyncMessage submits the sync committee message to the network. diff --git a/beacon-chain/rpc/eth/validator/BUILD.bazel b/beacon-chain/rpc/eth/validator/BUILD.bazel index 10840f595a..7320ffc0b9 100644 --- a/beacon-chain/rpc/eth/validator/BUILD.bazel +++ b/beacon-chain/rpc/eth/validator/BUILD.bazel @@ -77,7 +77,6 @@ go_test( "//beacon-chain/rpc/eth/shared/testing:go_default_library", "//beacon-chain/rpc/testutil:go_default_library", "//beacon-chain/state:go_default_library", - "//beacon-chain/state/state-native:go_default_library", "//beacon-chain/state/stategen:go_default_library", "//beacon-chain/sync/initial-sync/testing:go_default_library", "//config/fieldparams:go_default_library", diff --git a/beacon-chain/rpc/eth/validator/handlers_test.go b/beacon-chain/rpc/eth/validator/handlers_test.go index 03eb146b19..480869db1f 100644 --- a/beacon-chain/rpc/eth/validator/handlers_test.go +++ b/beacon-chain/rpc/eth/validator/handlers_test.go @@ -9,7 +9,6 @@ import ( "net/http/httptest" "strconv" "strings" - "sync" "testing" "time" @@ -31,7 +30,6 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/testutil" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" - state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen" mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" @@ -814,29 +812,18 @@ func TestGetAttestationData(t *testing.T) { require.NoError(t, err, "Could not hash beacon block") justifiedRoot, err := justifiedBlock.Block.HashTreeRoot() require.NoError(t, err, "Could not get signing root for justified block") - targetRoot, err := targetBlock.Block.HashTreeRoot() - require.NoError(t, err, "Could not get signing root for target block") slot := 3*params.BeaconConfig().SlotsPerEpoch + 1 - beaconState, err := util.NewBeaconState() - require.NoError(t, err) - require.NoError(t, beaconState.SetSlot(slot)) - err = beaconState.SetCurrentJustifiedCheckpoint(ðpbalpha.Checkpoint{ + justifiedCheckpoint := ðpbalpha.Checkpoint{ Epoch: 2, Root: justifiedRoot[:], - }) - require.NoError(t, err) - - blockRoots := beaconState.BlockRoots() - blockRoots[1] = blockRoot[:] - blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:] - blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:] - require.NoError(t, beaconState.SetBlockRoots(blockRoots)) + } offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) chain := &mockChain.ChainService{ - Optimistic: false, - Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), - State: beaconState, - Root: blockRoot[:], + Optimistic: false, + Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), + Root: blockRoot[:], + CurrentJustifiedCheckPoint: justifiedCheckpoint, + TargetRoot: blockRoot, } s := &Server{ @@ -845,9 +832,10 @@ func TestGetAttestationData(t *testing.T) { TimeFetcher: chain, OptimisticModeFetcher: chain, CoreService: &core.Service{ - AttestationCache: cache.NewAttestationCache(), HeadFetcher: chain, GenesisTimeFetcher: chain, + FinalizedFetcher: chain, + AttestationCache: cache.NewAttestationCache(), }, } @@ -915,9 +903,10 @@ func TestGetAttestationData(t *testing.T) { beaconState, err := util.NewBeaconState() require.NoError(t, err) chain := &mockChain.ChainService{ - Optimistic: true, - State: beaconState, - Genesis: time.Now(), + Optimistic: true, + State: beaconState, + Genesis: time.Now(), + CurrentJustifiedCheckPoint: ðpbalpha.Checkpoint{}, } s := &Server{ @@ -929,6 +918,7 @@ func TestGetAttestationData(t *testing.T) { AttestationCache: cache.NewAttestationCache(), GenesisTimeFetcher: chain, HeadFetcher: chain, + FinalizedFetcher: chain, }, } @@ -955,97 +945,13 @@ func TestGetAttestationData(t *testing.T) { assert.Equal(t, http.StatusOK, writer.Code) }) - t.Run("handles in progress request", func(t *testing.T) { - state, err := state_native.InitializeFromProtoPhase0(ðpbalpha.BeaconState{Slot: 100}) - require.NoError(t, err) - ctx := context.Background() - slot := primitives.Slot(2) - offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) - chain := &mockChain.ChainService{ - Optimistic: false, - Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), - State: state, - } - - s := &Server{ - SyncChecker: &mockSync.Sync{IsSyncing: false}, - HeadFetcher: chain, - TimeFetcher: chain, - OptimisticModeFetcher: chain, - CoreService: &core.Service{ - AttestationCache: cache.NewAttestationCache(), - HeadFetcher: chain, - GenesisTimeFetcher: chain, - }, - } - - expectedResponse := &GetAttestationDataResponse{ - Data: &shared.AttestationData{ - Slot: strconv.FormatUint(uint64(slot), 10), - CommitteeIndex: strconv.FormatUint(1, 10), - BeaconBlockRoot: hexutil.Encode(make([]byte, 32)), - Source: &shared.Checkpoint{ - Epoch: strconv.FormatUint(42, 10), - Root: hexutil.Encode(make([]byte, 32)), - }, - Target: &shared.Checkpoint{ - Epoch: strconv.FormatUint(55, 10), - Root: hexutil.Encode(make([]byte, 32)), - }, - }, - } - - expectedResponsePb := ðpbalpha.AttestationData{ - Slot: slot, - CommitteeIndex: 1, - BeaconBlockRoot: make([]byte, 32), - Source: ðpbalpha.Checkpoint{Epoch: 42, Root: make([]byte, 32)}, - Target: ðpbalpha.Checkpoint{Epoch: 55, Root: make([]byte, 32)}, - } - - url := fmt.Sprintf("http://example.com?slot=%d&committee_index=%d", slot, 1) - request := httptest.NewRequest(http.MethodGet, url, nil) - writer := httptest.NewRecorder() - writer.Body = &bytes.Buffer{} - - requestPb := ðpbalpha.AttestationDataRequest{ - CommitteeIndex: 1, - Slot: slot, - } - - require.NoError(t, s.CoreService.AttestationCache.MarkInProgress(requestPb)) - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - s.GetAttestationData(writer, request) - - assert.Equal(t, http.StatusOK, writer.Code) - resp := &GetAttestationDataResponse{} - require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp)) - require.NotNil(t, resp) - assert.DeepEqual(t, expectedResponse, resp) - }() - - wg.Add(1) - go func() { - defer wg.Done() - - assert.NoError(t, s.CoreService.AttestationCache.Put(ctx, requestPb, expectedResponsePb)) - assert.NoError(t, s.CoreService.AttestationCache.MarkNotInProgress(requestPb)) - }() - - wg.Wait() - }) - t.Run("invalid slot", func(t *testing.T) { slot := 3*params.BeaconConfig().SlotsPerEpoch + 1 offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) chain := &mockChain.ChainService{ - Optimistic: false, - Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), + Optimistic: false, + Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), + CurrentJustifiedCheckPoint: ðpbalpha.Checkpoint{}, } s := &Server{ @@ -1055,6 +961,7 @@ func TestGetAttestationData(t *testing.T) { OptimisticModeFetcher: chain, CoreService: &core.Service{ GenesisTimeFetcher: chain, + FinalizedFetcher: chain, }, } @@ -1072,7 +979,7 @@ func TestGetAttestationData(t *testing.T) { assert.Equal(t, true, strings.Contains(e.Message, "invalid request")) }) - t.Run("head state slot greater than request slot", func(t *testing.T) { + t.Run("request slot is not current slot", func(t *testing.T) { ctx := context.Background() db := dbutil.SetupDB(t) @@ -1092,37 +999,17 @@ func TestGetAttestationData(t *testing.T) { util.SaveBlock(t, ctx, db, block2) justifiedRoot, err := justifiedBlock.Block.HashTreeRoot() require.NoError(t, err, "Could not get signing root for justified block") - targetRoot, err := targetBlock.Block.HashTreeRoot() - require.NoError(t, err, "Could not get signing root for target block") - - beaconState, err := util.NewBeaconState() - require.NoError(t, err) - require.NoError(t, beaconState.SetSlot(slot)) - offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) - require.NoError(t, beaconState.SetGenesisTime(uint64(time.Now().Unix()-offset))) - err = beaconState.SetLatestBlockHeader(util.HydrateBeaconHeader(ðpbalpha.BeaconBlockHeader{ - ParentRoot: blockRoot2[:], - })) - require.NoError(t, err) - err = beaconState.SetCurrentJustifiedCheckpoint(ðpbalpha.Checkpoint{ + justifiedCheckpoint := ðpbalpha.Checkpoint{ Epoch: 2, Root: justifiedRoot[:], - }) - require.NoError(t, err) - blockRoots := beaconState.BlockRoots() - blockRoots[1] = blockRoot[:] - blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:] - blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:] - blockRoots[3*params.BeaconConfig().SlotsPerEpoch] = blockRoot2[:] - require.NoError(t, beaconState.SetBlockRoots(blockRoots)) + } - beaconstate := beaconState.Copy() - require.NoError(t, beaconstate.SetSlot(beaconstate.Slot()-1)) - require.NoError(t, db.SaveState(ctx, beaconstate, blockRoot2)) + offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) chain := &mockChain.ChainService{ - State: beaconState, - Root: blockRoot[:], - Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), + Root: blockRoot[:], + Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), + CurrentJustifiedCheckPoint: justifiedCheckpoint, + TargetRoot: blockRoot2, } s := &Server{ @@ -1131,45 +1018,20 @@ func TestGetAttestationData(t *testing.T) { TimeFetcher: chain, OptimisticModeFetcher: chain, CoreService: &core.Service{ - AttestationCache: cache.NewAttestationCache(), HeadFetcher: chain, GenesisTimeFetcher: chain, StateGen: stategen.New(db, doublylinkedtree.New()), + FinalizedFetcher: chain, }, } - require.NoError(t, db.SaveState(ctx, beaconState, blockRoot)) - util.SaveBlock(t, ctx, db, block) - require.NoError(t, db.SaveHeadBlockRoot(ctx, blockRoot)) - url := fmt.Sprintf("http://example.com?slot=%d&committee_index=%d", slot-1, 0) request := httptest.NewRequest(http.MethodGet, url, nil) writer := httptest.NewRecorder() writer.Body = &bytes.Buffer{} s.GetAttestationData(writer, request) - - expectedResponse := &GetAttestationDataResponse{ - Data: &shared.AttestationData{ - Slot: strconv.FormatUint(uint64(slot-1), 10), - CommitteeIndex: strconv.FormatUint(0, 10), - BeaconBlockRoot: hexutil.Encode(blockRoot2[:]), - Source: &shared.Checkpoint{ - Epoch: strconv.FormatUint(2, 10), - Root: hexutil.Encode(justifiedRoot[:]), - }, - Target: &shared.Checkpoint{ - Epoch: strconv.FormatUint(3, 10), - Root: hexutil.Encode(blockRoot2[:]), - }, - }, - } - - assert.Equal(t, http.StatusOK, writer.Code) - resp := &GetAttestationDataResponse{} - require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp)) - require.NotNil(t, resp) - assert.DeepEqual(t, expectedResponse, resp) + assert.Equal(t, http.StatusBadRequest, writer.Code) }) t.Run("succeeds in first epoch", func(t *testing.T) { @@ -1184,27 +1046,18 @@ func TestGetAttestationData(t *testing.T) { require.NoError(t, err, "Could not hash beacon block") justifiedRoot, err := justifiedBlock.Block.HashTreeRoot() require.NoError(t, err, "Could not get signing root for justified block") - targetRoot, err := targetBlock.Block.HashTreeRoot() - require.NoError(t, err, "Could not get signing root for target block") - beaconState, err := util.NewBeaconState() - require.NoError(t, err) - require.NoError(t, beaconState.SetSlot(slot)) - err = beaconState.SetCurrentJustifiedCheckpoint(ðpbalpha.Checkpoint{ + justifiedCheckpt := ðpbalpha.Checkpoint{ Epoch: 0, Root: justifiedRoot[:], - }) + } require.NoError(t, err) - blockRoots := beaconState.BlockRoots() - blockRoots[1] = blockRoot[:] - blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:] - blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:] - require.NoError(t, beaconState.SetBlockRoots(blockRoots)) offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) chain := &mockChain.ChainService{ - State: beaconState, - Root: blockRoot[:], - Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), + Root: blockRoot[:], + Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), + CurrentJustifiedCheckPoint: justifiedCheckpt, + TargetRoot: blockRoot, } s := &Server{ @@ -1216,6 +1069,7 @@ func TestGetAttestationData(t *testing.T) { AttestationCache: cache.NewAttestationCache(), HeadFetcher: chain, GenesisTimeFetcher: chain, + FinalizedFetcher: chain, }, } @@ -1279,28 +1133,17 @@ func TestGetAttestationData(t *testing.T) { require.NoError(t, err, "Could not hash beacon block") justifiedBlockRoot, err := justifiedBlock.Block.HashTreeRoot() require.NoError(t, err, "Could not hash justified block") - epochBoundaryRoot, err := epochBoundaryBlock.Block.HashTreeRoot() - require.NoError(t, err, "Could not hash justified block") - slot := primitives.Slot(10000) - - beaconState, err := util.NewBeaconState() - require.NoError(t, err) - require.NoError(t, beaconState.SetSlot(slot)) - err = beaconState.SetCurrentJustifiedCheckpoint(ðpbalpha.Checkpoint{ + justifiedCheckpt := ðpbalpha.Checkpoint{ Epoch: slots.ToEpoch(1500), Root: justifiedBlockRoot[:], - }) - require.NoError(t, err) - blockRoots := beaconState.BlockRoots() - blockRoots[1] = blockRoot[:] - blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = epochBoundaryRoot[:] - blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedBlockRoot[:] - require.NoError(t, beaconState.SetBlockRoots(blockRoots)) + } + slot := primitives.Slot(10000) offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) chain := &mockChain.ChainService{ - State: beaconState, - Root: blockRoot[:], - Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), + Root: blockRoot[:], + Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), + CurrentJustifiedCheckPoint: justifiedCheckpt, + TargetRoot: blockRoot, } s := &Server{ @@ -1312,6 +1155,7 @@ func TestGetAttestationData(t *testing.T) { AttestationCache: cache.NewAttestationCache(), HeadFetcher: chain, GenesisTimeFetcher: chain, + FinalizedFetcher: chain, }, } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/attester_mainnet_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/attester_mainnet_test.go index b8040e957f..d4e13e0ee5 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/attester_mainnet_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/attester_mainnet_test.go @@ -48,23 +48,13 @@ func TestAttestationDataAtSlot_HandlesFarAwayJustifiedEpoch(t *testing.T) { require.NoError(t, err, "Could not hash beacon block") justifiedBlockRoot, err := justifiedBlock.Block.HashTreeRoot() require.NoError(t, err, "Could not hash justified block") - epochBoundaryRoot, err := epochBoundaryBlock.Block.HashTreeRoot() - require.NoError(t, err, "Could not hash justified block") - slot := primitives.Slot(10000) - beaconState, err := util.NewBeaconState() - require.NoError(t, err) - require.NoError(t, beaconState.SetSlot(slot)) - err = beaconState.SetCurrentJustifiedCheckpoint(ðpb.Checkpoint{ + justifiedCheckpoint := ðpb.Checkpoint{ Epoch: slots.ToEpoch(1500), Root: justifiedBlockRoot[:], - }) + } require.NoError(t, err) - blockRoots := beaconState.BlockRoots() - blockRoots[1] = blockRoot[:] - blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = epochBoundaryRoot[:] - blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedBlockRoot[:] - require.NoError(t, beaconState.SetBlockRoots(blockRoots)) + slot := primitives.Slot(10000) offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) attesterServer := &Server{ SyncChecker: &mockSync.Sync{IsSyncing: false}, @@ -72,8 +62,9 @@ func TestAttestationDataAtSlot_HandlesFarAwayJustifiedEpoch(t *testing.T) { TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, CoreService: &core.Service{ AttestationCache: cache.NewAttestationCache(), - HeadFetcher: &mock.ChainService{State: beaconState, Root: blockRoot[:]}, + HeadFetcher: &mock.ChainService{TargetRoot: blockRoot, Root: blockRoot[:]}, GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, + FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint}, }, } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go index f7985e79af..e1a5cfd5c6 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go @@ -14,7 +14,6 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations" mockp2p "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core" - state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen" mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing" "github.com/prysmaticlabs/prysm/v4/config/params" @@ -93,42 +92,32 @@ func TestGetAttestationData_OK(t *testing.T) { block.Block.Slot = 3*params.BeaconConfig().SlotsPerEpoch + 1 targetBlock := util.NewBeaconBlock() targetBlock.Block.Slot = 1 * params.BeaconConfig().SlotsPerEpoch + targetRoot, err := targetBlock.Block.HashTreeRoot() + require.NoError(t, err, "Could not get signing root for target block") + justifiedBlock := util.NewBeaconBlock() justifiedBlock.Block.Slot = 2 * params.BeaconConfig().SlotsPerEpoch blockRoot, err := block.Block.HashTreeRoot() require.NoError(t, err, "Could not hash beacon block") justifiedRoot, err := justifiedBlock.Block.HashTreeRoot() require.NoError(t, err, "Could not get signing root for justified block") - targetRoot, err := targetBlock.Block.HashTreeRoot() - require.NoError(t, err, "Could not get signing root for target block") slot := 3*params.BeaconConfig().SlotsPerEpoch + 1 - beaconState, err := util.NewBeaconState() - require.NoError(t, err) - require.NoError(t, beaconState.SetSlot(slot)) - err = beaconState.SetCurrentJustifiedCheckpoint(ðpb.Checkpoint{ + justifiedCheckpoint := ðpb.Checkpoint{ Epoch: 2, Root: justifiedRoot[:], - }) - require.NoError(t, err) - - blockRoots := beaconState.BlockRoots() - blockRoots[1] = blockRoot[:] - blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:] - blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:] - require.NoError(t, beaconState.SetBlockRoots(blockRoots)) + } offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) attesterServer := &Server{ SyncChecker: &mockSync.Sync{IsSyncing: false}, OptimisticModeFetcher: &mock.ChainService{Optimistic: false}, TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, CoreService: &core.Service{ - AttestationCache: cache.NewAttestationCache(), - HeadFetcher: &mock.ChainService{ - State: beaconState, Root: blockRoot[:], - }, + HeadFetcher: &mock.ChainService{TargetRoot: targetRoot, Root: blockRoot[:]}, GenesisTimeFetcher: &mock.ChainService{ Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), }, + FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint}, + AttestationCache: cache.NewAttestationCache(), }, } @@ -148,7 +137,7 @@ func TestGetAttestationData_OK(t *testing.T) { }, Target: ðpb.Checkpoint{ Epoch: 3, - Root: blockRoot[:], + Root: targetRoot[:], }, } @@ -157,6 +146,63 @@ func TestGetAttestationData_OK(t *testing.T) { } } +func BenchmarkGetAttestationDataConcurrent(b *testing.B) { + block := util.NewBeaconBlock() + block.Block.Slot = 3*params.BeaconConfig().SlotsPerEpoch + 1 + targetBlock := util.NewBeaconBlock() + targetBlock.Block.Slot = 1 * params.BeaconConfig().SlotsPerEpoch + targetRoot, err := targetBlock.Block.HashTreeRoot() + require.NoError(b, err, "Could not get signing root for target block") + + justifiedBlock := util.NewBeaconBlock() + justifiedBlock.Block.Slot = 2 * params.BeaconConfig().SlotsPerEpoch + blockRoot, err := block.Block.HashTreeRoot() + require.NoError(b, err, "Could not hash beacon block") + justifiedRoot, err := justifiedBlock.Block.HashTreeRoot() + require.NoError(b, err, "Could not get signing root for justified block") + slot := 3*params.BeaconConfig().SlotsPerEpoch + 1 + justifiedCheckpoint := ðpb.Checkpoint{ + Epoch: 2, + Root: justifiedRoot[:], + } + offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) + attesterServer := &Server{ + SyncChecker: &mockSync.Sync{IsSyncing: false}, + OptimisticModeFetcher: &mock.ChainService{Optimistic: false}, + TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, + CoreService: &core.Service{ + AttestationCache: cache.NewAttestationCache(), + HeadFetcher: &mock.ChainService{TargetRoot: targetRoot, Root: blockRoot[:]}, + GenesisTimeFetcher: &mock.ChainService{ + Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second), + }, + FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint}, + }, + } + + req := ðpb.AttestationDataRequest{ + CommitteeIndex: 0, + Slot: 3*params.BeaconConfig().SlotsPerEpoch + 1, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + wg.Add(5000) // for 5000 concurrent accesses + + for j := 0; j < 5000; j++ { + go func() { + defer wg.Done() + _, err := attesterServer.GetAttestationData(context.Background(), req) + require.NoError(b, err, "Could not get attestation info at slot") + }() + } + wg.Wait() // Wait for all goroutines to finish + } + + b.Log("Elapsed time:", b.Elapsed()) +} + func TestGetAttestationData_SyncNotReady(t *testing.T) { as := Server{ SyncChecker: &mockSync.Sync{IsSyncing: true}, @@ -178,6 +224,7 @@ func TestGetAttestationData_Optimistic(t *testing.T) { CoreService: &core.Service{ GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now()}, HeadFetcher: &mock.ChainService{}, + AttestationCache: cache.NewAttestationCache(), }, } _, err := as.GetAttestationData(context.Background(), ðpb.AttestationDataRequest{}) @@ -193,68 +240,16 @@ func TestGetAttestationData_Optimistic(t *testing.T) { OptimisticModeFetcher: &mock.ChainService{Optimistic: false}, TimeFetcher: &mock.ChainService{Genesis: time.Now()}, CoreService: &core.Service{ + AttestationCache: cache.NewAttestationCache(), GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now()}, HeadFetcher: &mock.ChainService{Optimistic: false, State: beaconState}, - AttestationCache: cache.NewAttestationCache(), + FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: ðpb.Checkpoint{}}, }, } _, err = as.GetAttestationData(context.Background(), ðpb.AttestationDataRequest{}) require.NoError(t, err) } -func TestAttestationDataSlot_handlesInProgressRequest(t *testing.T) { - s := ðpb.BeaconState{Slot: 100} - state, err := state_native.InitializeFromProtoPhase0(s) - require.NoError(t, err) - ctx := context.Background() - slot := primitives.Slot(2) - offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) - server := &Server{ - SyncChecker: &mockSync.Sync{IsSyncing: false}, - OptimisticModeFetcher: &mock.ChainService{Optimistic: false}, - TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, - CoreService: &core.Service{ - AttestationCache: cache.NewAttestationCache(), - HeadFetcher: &mock.ChainService{State: state}, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, - }, - } - - req := ðpb.AttestationDataRequest{ - CommitteeIndex: 1, - Slot: slot, - } - - res := ðpb.AttestationData{ - CommitteeIndex: 1, - Target: ðpb.Checkpoint{Epoch: 55, Root: make([]byte, 32)}, - } - - require.NoError(t, server.CoreService.AttestationCache.MarkInProgress(req)) - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - response, err := server.GetAttestationData(ctx, req) - require.NoError(t, err) - if !proto.Equal(res, response) { - t.Error("Expected equal responses from cache") - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - - assert.NoError(t, server.CoreService.AttestationCache.Put(ctx, req, res)) - assert.NoError(t, server.CoreService.AttestationCache.MarkNotInProgress(req)) - }() - - wg.Wait() -} - func TestServer_GetAttestationData_InvalidRequestSlot(t *testing.T) { ctx := context.Background() @@ -276,11 +271,7 @@ func TestServer_GetAttestationData_InvalidRequestSlot(t *testing.T) { assert.ErrorContains(t, "invalid request", err) } -func TestServer_GetAttestationData_HeadStateSlotGreaterThanRequestSlot(t *testing.T) { - // There exists a rare scenario where the validator may request an attestation for a slot less - // than the head state's slot. The Ethereum consensus spec constraints require the block root the - // attestation is referencing be less than or equal to the attestation data slot. - // See: https://github.com/prysmaticlabs/prysm/issues/5164 +func TestServer_GetAttestationData_RequestSlotIsDifferentThanCurrentSlot(t *testing.T) { ctx := context.Background() db := dbutil.SetupDB(t) @@ -300,72 +291,30 @@ func TestServer_GetAttestationData_HeadStateSlotGreaterThanRequestSlot(t *testin util.SaveBlock(t, ctx, db, block2) justifiedRoot, err := justifiedBlock.Block.HashTreeRoot() require.NoError(t, err, "Could not get signing root for justified block") - targetRoot, err := targetBlock.Block.HashTreeRoot() - require.NoError(t, err, "Could not get signing root for target block") - - beaconState, err := util.NewBeaconState() - require.NoError(t, err) - require.NoError(t, beaconState.SetSlot(slot)) - offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) - require.NoError(t, beaconState.SetGenesisTime(uint64(time.Now().Unix()-offset))) - err = beaconState.SetLatestBlockHeader(util.HydrateBeaconHeader(ðpb.BeaconBlockHeader{ - ParentRoot: blockRoot2[:], - })) - require.NoError(t, err) - err = beaconState.SetCurrentJustifiedCheckpoint(ðpb.Checkpoint{ + justifiedCheckpoint := ðpb.Checkpoint{ Epoch: 2, Root: justifiedRoot[:], - }) - require.NoError(t, err) - blockRoots := beaconState.BlockRoots() - blockRoots[1] = blockRoot[:] - blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:] - blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:] - blockRoots[3*params.BeaconConfig().SlotsPerEpoch] = blockRoot2[:] - require.NoError(t, beaconState.SetBlockRoots(blockRoots)) - - beaconstate := beaconState.Copy() - require.NoError(t, beaconstate.SetSlot(beaconstate.Slot()-1)) - require.NoError(t, db.SaveState(ctx, beaconstate, blockRoot2)) - offset = int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) + } + offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) attesterServer := &Server{ SyncChecker: &mockSync.Sync{IsSyncing: false}, OptimisticModeFetcher: &mock.ChainService{Optimistic: false}, TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, CoreService: &core.Service{ - AttestationCache: cache.NewAttestationCache(), - HeadFetcher: &mock.ChainService{State: beaconState, Root: blockRoot[:]}, + HeadFetcher: &mock.ChainService{TargetRoot: blockRoot2, Root: blockRoot[:]}, GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, StateGen: stategen.New(db, doublylinkedtree.New()), + FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint}, }, } - require.NoError(t, db.SaveState(ctx, beaconState, blockRoot)) util.SaveBlock(t, ctx, db, block) - require.NoError(t, db.SaveHeadBlockRoot(ctx, blockRoot)) req := ðpb.AttestationDataRequest{ CommitteeIndex: 0, Slot: slot - 1, } - res, err := attesterServer.GetAttestationData(ctx, req) - require.NoError(t, err, "Could not get attestation info at slot") - - expectedInfo := ðpb.AttestationData{ - Slot: slot - 1, - BeaconBlockRoot: blockRoot2[:], - Source: ðpb.Checkpoint{ - Epoch: 2, - Root: justifiedRoot[:], - }, - Target: ðpb.Checkpoint{ - Epoch: 3, - Root: blockRoot2[:], - }, - } - - if !proto.Equal(res, expectedInfo) { - t.Errorf("Expected attestation info to match, received %v, wanted %v", res, expectedInfo) - } + _, err = attesterServer.GetAttestationData(ctx, req) + require.ErrorContains(t, "invalid request: slot 24 is not the current slot 25", err) } func TestGetAttestationData_SucceedsInFirstEpoch(t *testing.T) { @@ -383,19 +332,10 @@ func TestGetAttestationData_SucceedsInFirstEpoch(t *testing.T) { targetRoot, err := targetBlock.Block.HashTreeRoot() require.NoError(t, err, "Could not get signing root for target block") - beaconState, err := util.NewBeaconState() - require.NoError(t, err) - require.NoError(t, beaconState.SetSlot(slot)) - err = beaconState.SetCurrentJustifiedCheckpoint(ðpb.Checkpoint{ + justifiedCheckpoint := ðpb.Checkpoint{ Epoch: 0, Root: justifiedRoot[:], - }) - require.NoError(t, err) - blockRoots := beaconState.BlockRoots() - blockRoots[1] = blockRoot[:] - blockRoots[1*params.BeaconConfig().SlotsPerEpoch] = targetRoot[:] - blockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:] - require.NoError(t, beaconState.SetBlockRoots(blockRoots)) + } offset := int64(slot.Mul(params.BeaconConfig().SecondsPerSlot)) attesterServer := &Server{ SyncChecker: &mockSync.Sync{IsSyncing: false}, @@ -404,9 +344,10 @@ func TestGetAttestationData_SucceedsInFirstEpoch(t *testing.T) { CoreService: &core.Service{ AttestationCache: cache.NewAttestationCache(), HeadFetcher: &mock.ChainService{ - State: beaconState, Root: blockRoot[:], + TargetRoot: targetRoot, Root: blockRoot[:], }, GenesisTimeFetcher: &mock.ChainService{Genesis: prysmTime.Now().Add(time.Duration(-1*offset) * time.Second)}, + FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint}, }, } @@ -426,7 +367,7 @@ func TestGetAttestationData_SucceedsInFirstEpoch(t *testing.T) { }, Target: ðpb.Checkpoint{ Epoch: 0, - Root: blockRoot[:], + Root: targetRoot[:], }, } diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index bb8e7ddfa6..36061b83d3 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -367,6 +367,7 @@ func (s *Service) Start() { AttestationCache: cache.NewAttestationCache(), StateGen: s.cfg.StateGen, P2P: s.cfg.Broadcaster, + FinalizedFetcher: s.cfg.FinalizationFetcher, } validatorServer := &validatorv1alpha1.Server{