mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 07:58:22 -05:00
Moved delay att inclusion to fork choice service (#3345)
This commit is contained in:
@@ -27,7 +27,6 @@ go_library(
|
||||
"//proto/eth/v1alpha1:go_default_library",
|
||||
"//shared/bytesutil:go_default_library",
|
||||
"//shared/event:go_default_library",
|
||||
"//shared/params:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||
|
||||
@@ -51,7 +51,7 @@ import (
|
||||
// for i in indexed_attestation.custody_bit_0_indices + indexed_attestation.custody_bit_1_indices:
|
||||
// if i not in store.latest_messages or target.epoch > store.latest_messages[i].epoch:
|
||||
// store.latest_messages[i] = LatestMessage(epoch=target.epoch, root=attestation.data.beacon_block_root)
|
||||
func (s *Store) OnAttestation(ctx context.Context, a *ethpb.Attestation) error {
|
||||
func (s *Store) OnAttestation(ctx context.Context, a *ethpb.Attestation) (uint64, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "forkchoice.onAttestation")
|
||||
defer span.End()
|
||||
|
||||
@@ -60,44 +60,49 @@ func (s *Store) OnAttestation(ctx context.Context, a *ethpb.Attestation) error {
|
||||
|
||||
// Verify beacon node has seen the target block before.
|
||||
if !s.db.HasBlock(ctx, bytesutil.ToBytes32(tgt.Root)) {
|
||||
return fmt.Errorf("target root %#x does not exist in db", bytesutil.Trunc(tgt.Root))
|
||||
return 0, fmt.Errorf("target root %#x does not exist in db", bytesutil.Trunc(tgt.Root))
|
||||
}
|
||||
|
||||
// Verify attestation target has had a valid pre state produced by the target block.
|
||||
baseState, err := s.verifyAttPreState(ctx, tgt)
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Verify Attestations cannot be from future epochs.
|
||||
slotTime := baseState.GenesisTime + tgtSlot*params.BeaconConfig().SecondsPerSlot
|
||||
currentTime := uint64(time.Now().Unix())
|
||||
if slotTime > currentTime {
|
||||
return fmt.Errorf("could not process attestation from the future epoch, time %d > time %d", slotTime, currentTime)
|
||||
return 0, fmt.Errorf("could not process attestation from the future epoch, time %d > time %d", slotTime, currentTime)
|
||||
}
|
||||
|
||||
// Store target checkpoint state if not yet seen.
|
||||
baseState, err = s.saveCheckpointState(ctx, baseState, tgt)
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Delay attestation processing until the subsequent slot.
|
||||
if err := s.waitForAttInclDelay(ctx, a, baseState); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Verify attestations can only affect the fork choice of subsequent slots.
|
||||
if err := s.verifyAttSlotTime(ctx, baseState, a.Data); err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Use the target state to to validate attestation and calculate the committees.
|
||||
indexedAtt, err := s.verifyAttestation(ctx, baseState, a)
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Update every validator's latest vote.
|
||||
if err := s.updateAttVotes(ctx, indexedAtt, tgt.Root, tgt.Epoch); err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
return nil
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// verifyAttPreState validates input attested check point has a valid pre-state.
|
||||
@@ -138,6 +143,26 @@ func (s *Store) saveCheckpointState(ctx context.Context, baseState *pb.BeaconSta
|
||||
return targetState, nil
|
||||
}
|
||||
|
||||
// waitForAttInclDelay waits until the next slot because attestation can only affect
|
||||
// fork choice of subsequent slot. This is to delay attestation inclusion for fork choice
|
||||
// until the attested slot is in the past.
|
||||
func (s *Store) waitForAttInclDelay(ctx context.Context, a *ethpb.Attestation, targetState *pb.BeaconState) error {
|
||||
ctx, span := trace.StartSpan(ctx, "beacon-chain.forkchoice.waitForAttInclDelay")
|
||||
defer span.End()
|
||||
|
||||
slot, err := helpers.AttestationDataSlot(targetState, a.Data)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get attestation slot")
|
||||
}
|
||||
|
||||
nextSlot := slot + 1
|
||||
duration := time.Duration(nextSlot*params.BeaconConfig().SecondsPerSlot)*time.Second + time.Millisecond
|
||||
timeToInclude := time.Unix(int64(targetState.GenesisTime), 0).Add(duration)
|
||||
|
||||
time.Sleep(time.Until(timeToInclude))
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyAttSlotTime validates input attestation is not from the future.
|
||||
func (s *Store) verifyAttSlotTime(ctx context.Context, baseState *pb.BeaconState, d *ethpb.AttestationData) error {
|
||||
aSlot, err := helpers.AttestationDataSlot(baseState, d)
|
||||
|
||||
@@ -109,7 +109,7 @@ func TestStore_OnAttestation(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err := store.OnAttestation(ctx, tt.a)
|
||||
_, err := store.OnAttestation(ctx, tt.a)
|
||||
if tt.wantErr {
|
||||
if !strings.Contains(err.Error(), tt.wantErrString) {
|
||||
t.Errorf("Store.OnAttestation() error = %v, wantErr = %v", err, tt.wantErrString)
|
||||
|
||||
@@ -23,7 +23,7 @@ import (
|
||||
type ForkChoicer interface {
|
||||
Head(ctx context.Context) ([]byte, error)
|
||||
OnBlock(ctx context.Context, b *ethpb.BeaconBlock) error
|
||||
OnAttestation(ctx context.Context, a *ethpb.Attestation) error
|
||||
OnAttestation(ctx context.Context, a *ethpb.Attestation) (uint64, error)
|
||||
GenesisStore(ctx context.Context, genesisState *pb.BeaconState) error
|
||||
FinalizedCheckpt() *ethpb.Checkpoint
|
||||
}
|
||||
|
||||
@@ -4,14 +4,12 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
@@ -64,18 +62,14 @@ func (c *ChainService) ReceiveAttestationNoPubsub(ctx context.Context, att *ethp
|
||||
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveAttestationNoPubsub")
|
||||
defer span.End()
|
||||
|
||||
// Delay attestation inclusion until the attested slot is in the past.
|
||||
slot, err := c.waitForAttInclDelay(ctx, att)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not delay attestation inclusion")
|
||||
}
|
||||
|
||||
// Update forkchoice store for the new attestation
|
||||
if err := c.forkChoiceStore.OnAttestation(ctx, att); err != nil {
|
||||
attSlot, err := c.forkChoiceStore.OnAttestation(ctx, att)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not process block from fork choice service")
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"attSlot": slot,
|
||||
"attSlot": attSlot,
|
||||
"attDataRoot": hex.EncodeToString(att.Data.BeaconBlockRoot),
|
||||
}).Debug("Finished updating fork choice store for attestation")
|
||||
|
||||
@@ -94,7 +88,7 @@ func (c *ChainService) ReceiveAttestationNoPubsub(ctx context.Context, att *ethp
|
||||
}).Debug("Finished applying fork choice for attestation")
|
||||
|
||||
// Skip checking for competing attestation's target roots at epoch boundary.
|
||||
if helpers.IsEpochEnd(slot) {
|
||||
if !helpers.IsEpochStart(attSlot) {
|
||||
targetRoot, err := helpers.BlockRoot(c.headState, att.Data.Target.Epoch)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not get target root for epoch %d", att.Data.Target.Epoch)
|
||||
@@ -111,30 +105,6 @@ func (c *ChainService) ReceiveAttestationNoPubsub(ctx context.Context, att *ethp
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForAttInclDelay waits until the next slot because attestation can only affect
|
||||
// fork choice of subsequent slot. This is to delay attestation inclusion for fork choice
|
||||
// until the attested slot is in the past.
|
||||
func (c *ChainService) waitForAttInclDelay(ctx context.Context, a *ethpb.Attestation) (uint64, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "beacon-chain.forkchoice.waitForAttInclDelay")
|
||||
defer span.End()
|
||||
|
||||
s, err := c.beaconDB.State(ctx, bytesutil.ToBytes32(a.Data.Target.Root))
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "could not get state")
|
||||
}
|
||||
slot, err := helpers.AttestationDataSlot(s, a.Data)
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "could not get attestation slot")
|
||||
}
|
||||
|
||||
nextSlot := slot + 1
|
||||
duration := time.Duration(nextSlot*params.BeaconConfig().SecondsPerSlot) * time.Second
|
||||
timeToInclude := time.Unix(int64(s.GenesisTime), 0).Add(duration)
|
||||
|
||||
time.Sleep(time.Until(timeToInclude))
|
||||
return slot, nil
|
||||
}
|
||||
|
||||
// This checks if the attestation is from a competing chain, emits warning and updates metrics.
|
||||
func isCompetingAtts(headTargetRoot []byte, attTargetRoot []byte) {
|
||||
if !bytes.Equal(attTargetRoot, headTargetRoot) {
|
||||
|
||||
@@ -46,8 +46,8 @@ func (s *store) OnBlock(ctx context.Context, b *ethpb.BeaconBlock) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) OnAttestation(ctx context.Context, a *ethpb.Attestation) error {
|
||||
return nil
|
||||
func (s *store) OnAttestation(ctx context.Context, a *ethpb.Attestation) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (s *store) GenesisStore(ctx context.Context, genesisState *pb.BeaconState) error {
|
||||
|
||||
Reference in New Issue
Block a user