Cached head root retrieve from DB on miss (#4552)

This commit is contained in:
terence tsao
2020-01-14 21:02:02 -08:00
committed by GitHub
parent d0793f00c5
commit 5ab1efb537
12 changed files with 87 additions and 29 deletions

View File

@@ -7,6 +7,7 @@ import (
"github.com/gogo/protobuf/proto"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/epoch/precompute"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
@@ -29,7 +30,7 @@ type GenesisTimeFetcher interface {
// directly retrieves head related data.
type HeadFetcher interface {
HeadSlot() uint64
HeadRoot() []byte
HeadRoot(ctx context.Context) ([]byte, error)
HeadBlock() *ethpb.SignedBeaconBlock
HeadState(ctx context.Context) (*pb.BeaconState, error)
HeadValidatorsIndices(epoch uint64) ([]uint64, error)
@@ -109,16 +110,29 @@ func (s *Service) HeadSlot() uint64 {
}
// HeadRoot returns the root of the head of the chain.
func (s *Service) HeadRoot() []byte {
func (s *Service) HeadRoot(ctx context.Context) ([]byte, error) {
s.headLock.RLock()
defer s.headLock.RUnlock()
root := s.canonicalRoots[s.headSlot]
if len(root) != 0 {
return root
return root, nil
}
return params.BeaconConfig().ZeroHash[:]
b, err := s.beaconDB.HeadBlock(ctx)
if err != nil {
return nil, err
}
if b == nil {
return params.BeaconConfig().ZeroHash[:], nil
}
r, err := ssz.HashTreeRoot(b.Block)
if err != nil {
return nil, err
}
return r[:], nil
}
// HeadBlock returns the head block of the chain.

View File

@@ -39,7 +39,9 @@ func TestHeadRoot_DataRace(t *testing.T) {
[32]byte{},
)
}()
s.HeadRoot()
if _, err := s.HeadRoot(context.Background()); err != nil {
t.Fatal(err)
}
}
func TestHeadBlock_DataRace(t *testing.T) {

View File

@@ -33,7 +33,11 @@ func TestHeadRoot_Nil(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
c := setupBeaconChain(t, db)
if !bytes.Equal(c.HeadRoot(), params.BeaconConfig().ZeroHash[:]) {
headRoot, err := c.HeadRoot(context.Background())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(headRoot, params.BeaconConfig().ZeroHash[:]) {
t.Error("Incorrect pre chain start value")
}
}
@@ -131,8 +135,12 @@ func TestHeadRoot_CanRetrieve(t *testing.T) {
c := &Service{canonicalRoots: make(map[uint64][]byte)}
c.headSlot = 100
c.canonicalRoots[c.headSlot] = []byte{'A'}
if !bytes.Equal([]byte{'A'}, c.HeadRoot()) {
t.Errorf("Wanted head root: %v, got: %d", []byte{'A'}, c.HeadRoot())
headRoot, err := c.HeadRoot(context.Background())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal([]byte{'A'}, headRoot) {
t.Errorf("Wanted head root: %v, got: %d", []byte{'A'}, headRoot)
}
}

View File

@@ -15,10 +15,6 @@ var (
Name: "beacon_head_slot",
Help: "Slot of the head block of the beacon chain",
})
beaconHeadRoot = promauto.NewGauge(prometheus.GaugeOpts{
Name: "beacon_head_root",
Help: "Root of the head block of the beacon chain, it returns the lowest 8 bytes interpreted as little endian",
})
competingAtts = promauto.NewCounter(prometheus.CounterOpts{
Name: "competing_attestations",
Help: "The # of attestations received and processed from a competing chain",
@@ -60,7 +56,6 @@ var (
func (s *Service) reportSlotMetrics(currentSlot uint64) {
beaconSlot.Set(float64(currentSlot))
beaconHeadSlot.Set(float64(s.HeadSlot()))
beaconHeadRoot.Set(float64(bytesutil.ToLowInt64(s.HeadRoot())))
if s.headState != nil {
headFinalizedEpoch.Set(float64(s.headState.FinalizedCheckpoint.Epoch))
headFinalizedRoot.Set(float64(bytesutil.ToLowInt64(s.headState.FinalizedCheckpoint.Root)))

View File

@@ -42,7 +42,11 @@ func (s *Service) ReceiveAttestationNoPubsub(ctx context.Context, att *ethpb.Att
return errors.Wrap(err, "could not get head from fork choice service")
}
// Only save head if it's different than the current head.
if !bytes.Equal(headRoot, s.HeadRoot()) {
cachedHeadRoot, err := s.HeadRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get head root from cache")
}
if !bytes.Equal(headRoot, cachedHeadRoot) {
signed, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(headRoot))
if err != nil {
return errors.Wrap(err, "could not compute state from block head")

View File

@@ -87,7 +87,11 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB
}
// Only save head if it's different than the current head.
if !bytes.Equal(headRoot, s.HeadRoot()) {
cachedHeadRoot, err := s.HeadRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get head root from cache")
}
if !bytes.Equal(headRoot, cachedHeadRoot) {
signedHeadBlock, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(headRoot))
if err != nil {
return errors.Wrap(err, "could not compute state from block head")
@@ -152,8 +156,11 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth
if err != nil {
return errors.Wrap(err, "could not get signing root on received block")
}
if !bytes.Equal(root[:], s.HeadRoot()) {
cachedHeadRoot, err := s.HeadRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get head root from cache")
}
if !bytes.Equal(root[:], cachedHeadRoot) {
if err := s.saveHead(ctx, blockCopy, root); err != nil {
return errors.Wrap(err, "could not save head")
}
@@ -199,8 +206,13 @@ func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedB
return errors.Wrap(err, "could not get signing root on received blockCopy")
}
cachedHeadRoot, err := s.HeadRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get head root from cache")
}
if featureconfig.Get().InitSyncCacheState {
if !bytes.Equal(root[:], s.HeadRoot()) {
if !bytes.Equal(root[:], cachedHeadRoot) {
if err := s.saveHeadNoDB(ctx, blockCopy, root); err != nil {
err := errors.Wrap(err, "could not save head")
traceutil.AnnotateError(span, err)
@@ -208,7 +220,7 @@ func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedB
}
}
} else {
if !bytes.Equal(root[:], s.HeadRoot()) {
if !bytes.Equal(root[:], cachedHeadRoot) {
if err := s.saveHead(ctx, blockCopy, root); err != nil {
err := errors.Wrap(err, "could not save head")
traceutil.AnnotateError(span, err)

View File

@@ -97,7 +97,11 @@ func TestReceiveReceiveBlockNoPubsub_CanSaveHeadInfo(t *testing.T) {
t.Fatal(err)
}
if !bytes.Equal(r[:], chainService.HeadRoot()) {
headRoot, err := chainService.HeadRoot(context.Background())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(r[:], headRoot) {
t.Error("Incorrect head root saved")
}

View File

@@ -332,7 +332,11 @@ func TestChainService_InitializeChainInfo(t *testing.T) {
if headBlock.Block.Slot != c.HeadSlot() {
t.Error("head slot incorrect")
}
if !bytes.Equal(headRoot[:], c.HeadRoot()) {
r, err := c.HeadRoot(context.Background())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(headRoot[:], r) {
t.Error("head slot incorrect")
}
if c.genesisRoot != genesisRoot {

View File

@@ -128,8 +128,8 @@ func (ms *ChainService) HeadSlot() uint64 {
}
// HeadRoot mocks HeadRoot method in chain service.
func (ms *ChainService) HeadRoot() []byte {
return ms.Root
func (ms *ChainService) HeadRoot(ctx context.Context) ([]byte, error) {
return ms.Root, nil
}

View File

@@ -60,7 +60,10 @@ func (vs *Server) GetAttestationData(ctx context.Context, req *ethpb.Attestation
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not retrieve head state: %v", err)
}
headRoot := vs.HeadFetcher.HeadRoot()
headRoot, err := vs.HeadFetcher.HeadRoot(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not retrieve head root: %v", err)
}
if helpers.CurrentEpoch(headState) < helpers.SlotToEpoch(req.Slot) {
headState, err = state.ProcessSlots(ctx, headState, helpers.StartSlot(helpers.SlotToEpoch(req.Slot)))

View File

@@ -36,8 +36,10 @@ func (vs *Server) GetBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb
}
// Retrieve the parent block as the current head of the canonical chain.
parentRoot := vs.HeadFetcher.HeadRoot()
parentRoot, err := vs.HeadFetcher.HeadRoot(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not retrieve head root: %v", err)
}
eth1Data, err := vs.eth1Data(ctx, req.Slot)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get ETH1 data: %v", err)

View File

@@ -57,11 +57,16 @@ func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
headRoot, err := r.chain.HeadRoot(ctx)
if err != nil {
return err
}
resp := &pb.Status{
HeadForkVersion: r.chain.CurrentFork().CurrentVersion,
FinalizedRoot: r.chain.FinalizedCheckpt().Root,
FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch,
HeadRoot: r.chain.HeadRoot(),
HeadRoot: headRoot,
HeadSlot: r.chain.HeadSlot(),
}
stream, err := r.p2p.Send(ctx, resp, id)
@@ -130,18 +135,23 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
}
r.p2p.Peers().SetChainState(stream.Conn().RemotePeer(), m)
headRoot, err := r.chain.HeadRoot(ctx)
if err != nil {
return err
}
resp := &pb.Status{
HeadForkVersion: r.chain.CurrentFork().CurrentVersion,
FinalizedRoot: r.chain.FinalizedCheckpt().Root,
FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch,
HeadRoot: r.chain.HeadRoot(),
HeadRoot: headRoot,
HeadSlot: r.chain.HeadSlot(),
}
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
log.WithError(err).Error("Failed to write to stream")
}
_, err := r.p2p.Encoding().EncodeWithLength(stream, resp)
_, err = r.p2p.Encoding().EncodeWithLength(stream, resp)
return err
}