Implement attestation pool in memory (#3542)

* Run time bug

* Still failing

* Run time working

* Run time working

* Gazelle

* Fixed all the tests

* Revert config

* Revert back test configs

* Revert config

* Tested run time again, everything is good
This commit is contained in:
terence tsao
2019-09-23 09:11:44 -07:00
committed by Raul Jordan
parent 64795bd231
commit ab2d4e8ad6
7 changed files with 69 additions and 69 deletions

View File

@@ -59,7 +59,7 @@ func (s *Store) OnAttestation(ctx context.Context, a *ethpb.Attestation) (uint64
ctx, span := trace.StartSpan(ctx, "forkchoice.onAttestation")
defer span.End()
tgt := a.Data.Target
tgt := proto.Clone(a.Data.Target).(*ethpb.Checkpoint)
tgtSlot := helpers.StartSlot(tgt.Epoch)
// Verify beacon node has seen the target block before.
@@ -257,7 +257,7 @@ func (s *Store) updateAttVotes(
if err != nil {
return errors.Wrapf(err, "could not get latest vote for validator %d", i)
}
if !s.db.HasValidatorLatestVote(ctx, i) || tgtEpoch > vote.Epoch {
if vote == nil || tgtEpoch > vote.Epoch {
if err := s.db.SaveValidatorLatestVote(ctx, i, &pb.ValidatorLatestVote{
Epoch: tgtEpoch,
Root: tgtRoot,

View File

@@ -169,6 +169,12 @@ func (s *Store) updateBlockAttestationVote(ctx context.Context, att *ethpb.Attes
if err != nil {
return errors.Wrap(err, "could not get state for attestation tgt root")
}
if err := s.waitForAttInclDelay(ctx, att, baseState); err != nil {
return errors.Wrap(err, "could not wait for attestation inclusion delay")
}
if err := s.verifyAttSlotTime(ctx, baseState, att.Data); err != nil {
return errors.Wrap(err, "could not verify attestation slot time")
}
indexedAtt, err := blocks.ConvertToIndexed(baseState, att)
if err != nil {
return errors.Wrap(err, "could not convert attestation to indexed attestation")
@@ -178,7 +184,7 @@ func (s *Store) updateBlockAttestationVote(ctx context.Context, att *ethpb.Attes
if err != nil {
return errors.Wrapf(err, "could not get latest vote for validator %d", i)
}
if !s.db.HasValidatorLatestVote(ctx, i) || tgt.Epoch > vote.Epoch {
if vote == nil || tgt.Epoch > vote.Epoch {
if err := s.db.SaveValidatorLatestVote(ctx, i, &pb.ValidatorLatestVote{
Epoch: tgt.Epoch,
Root: tgt.Root,

View File

@@ -35,7 +35,7 @@ func (s *Service) ReceiveAttestation(ctx context.Context, att *ethpb.Attestation
return errors.Wrap(err, "could not broadcast attestation")
}
attRoot, err := ssz.HashTreeRoot(att)
attRoot, err := ssz.HashTreeRoot(att.Data)
if err != nil {
log.WithError(err).Error("Failed to hash attestation")
}

View File

@@ -4,7 +4,6 @@ package operations
import (
"context"
"fmt"
"sort"
"sync"
"time"
@@ -54,6 +53,8 @@ type Service struct {
incomingProcessedBlockFeed *event.Feed
incomingProcessedBlock chan *ethpb.BeaconBlock
error error
attestationPool map[[32]byte]*ethpb.Attestation
attestationPoolLock sync.Mutex
attestationLockCache *ccache.Cache
}
@@ -72,6 +73,7 @@ func NewService(ctx context.Context, cfg *Config) *Service {
beaconDB: cfg.BeaconDB,
incomingProcessedBlockFeed: new(event.Feed),
incomingProcessedBlock: make(chan *ethpb.BeaconBlock, params.BeaconConfig().DefaultBufferSize),
attestationPool: make(map[[32]byte]*ethpb.Attestation),
attestationLockCache: ccache.New(ccache.Configure()),
}
}
@@ -125,11 +127,11 @@ func (s *Service) retrieveLock(key [32]byte) *sync.Mutex {
// the attestations are returned in slot ascending order and up to MaxAttestations
// capacity. The attestations get deleted in DB after they have been retrieved.
func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([]*ethpb.Attestation, error) {
var attestations []*ethpb.Attestation
atts, err := s.beaconDB.Attestations(ctx, nil /*filter*/)
if err != nil {
return nil, errors.New("could not retrieve attestations from DB")
}
s.attestationPoolLock.Lock()
defer s.attestationPoolLock.Unlock()
atts := make([]*ethpb.Attestation, 0, len(s.attestationPool))
bState, err := s.beaconDB.HeadState(ctx)
if err != nil {
return nil, errors.New("could not retrieve attestations from DB")
@@ -142,20 +144,15 @@ func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([]
}
}
sort.Slice(atts, func(i, j int) bool {
return atts[i].Data.Target.Epoch < atts[j].Data.Target.Epoch
})
var validAttsCount uint64
for _, att := range atts {
for _, att := range s.attestationPool {
root, err := ssz.HashTreeRoot(att.Data)
if err != nil {
return nil, err
}
if _, err = blocks.ProcessAttestation(bState, att); err != nil {
hash, err := ssz.HashTreeRoot(att)
if err != nil {
return nil, err
}
if err := s.beaconDB.DeleteAttestation(ctx, hash); err != nil {
return nil, err
}
delete(s.attestationPool, root)
continue
}
@@ -164,9 +161,10 @@ func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([]
if validAttsCount == params.BeaconConfig().MaxAttestations {
break
}
attestations = append(attestations, att)
atts = append(atts, att)
}
return attestations, nil
return atts, nil
}
// HandleValidatorExits processes a validator exit operation.
@@ -188,6 +186,9 @@ func (s *Service) HandleValidatorExits(ctx context.Context, message proto.Messag
// HandleAttestation processes a received attestation message.
func (s *Service) HandleAttestation(ctx context.Context, message proto.Message) error {
s.attestationPoolLock.Lock()
defer s.attestationPoolLock.Unlock()
ctx, span := trace.StartSpan(ctx, "operations.HandleAttestation")
defer span.End()
@@ -201,23 +202,18 @@ func (s *Service) HandleAttestation(ctx context.Context, message proto.Message)
lock.Lock()
defer lock.Unlock()
if s.beaconDB.HasAttestation(ctx, root) {
dbAtt, err := s.beaconDB.Attestation(ctx, root)
if err != nil {
return err
}
attestation, err = helpers.AggregateAttestation(dbAtt, attestation)
if err != nil {
return err
}
if err := s.beaconDB.SaveAttestation(ctx, attestation); err != nil {
return err
}
} else {
if err := s.beaconDB.SaveAttestation(ctx, attestation); err != nil {
return err
}
savedAtt, ok := s.attestationPool[root]
if !ok {
s.attestationPool[root] = attestation
return nil
}
savedAtt, err = helpers.AggregateAttestation(savedAtt, attestation)
if err != nil {
return err
}
s.attestationPool[root] = savedAtt
return nil
}
@@ -253,16 +249,18 @@ func (s *Service) handleProcessedBlock(ctx context.Context, message proto.Messag
// removeAttestationsFromPool removes a list of attestations from the DB
// after they have been included in a beacon block.
func (s *Service) removeAttestationsFromPool(ctx context.Context, attestations []*ethpb.Attestation) error {
s.attestationPoolLock.Lock()
defer s.attestationPoolLock.Unlock()
for _, attestation := range attestations {
root, err := ssz.HashTreeRoot(attestation.Data)
if err != nil {
return err
}
if s.beaconDB.HasAttestation(ctx, root) {
if err := s.beaconDB.DeleteAttestation(ctx, root); err != nil {
return err
}
_, ok := s.attestationPool[root]
if ok {
delete(s.attestationPool, root)
log.WithField("root", fmt.Sprintf("%#x", root)).Debug("Attestation removed from pool")
}
}

View File

@@ -168,6 +168,7 @@ func TestHandleAttestation_Aggregates_LargeNumValidators(t *testing.T) {
opsSrv := NewService(ctx, &Config{
BeaconDB: beaconDB,
})
opsSrv.attestationPool = make(map[[32]byte]*ethpb.Attestation)
// First, we create a common attestation data.
data := &ethpb.AttestationData{
@@ -254,12 +255,9 @@ func TestHandleAttestation_Aggregates_LargeNumValidators(t *testing.T) {
}
wg.Wait()
// We fetch the final attestation from the DB, which should be an aggregation of
// We fetch the final attestation from the attestation pool, which should be an aggregation of
// all committee members effectively.
aggAtt, err := beaconDB.Attestation(ctx, attDataRoot)
if err != nil {
t.Error(err)
}
aggAtt := opsSrv.attestationPool[attDataRoot]
b1 := aggAtt.AggregationBits.Bytes()
b2 := totalAggBits.Bytes()
@@ -278,11 +276,11 @@ func TestHandleAttestation_Aggregates_LargeNumValidators(t *testing.T) {
func TestHandleAttestation_Skips_PreviouslyAggregatedAttestations(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
defer dbutil.TeardownDB(t, beaconDB)
ctx := context.Background()
helpers.ClearAllCaches()
service := NewService(context.Background(), &Config{
BeaconDB: beaconDB,
})
service.attestationPool = make(map[[32]byte]*ethpb.Attestation)
deposits, privKeys := testutil.SetupInitialDeposits(t, 200)
beaconState, err := state.GenesisBeaconState(deposits, uint64(0), &ethpb.Eth1Data{})
@@ -411,10 +409,8 @@ func TestHandleAttestation_Skips_PreviouslyAggregatedAttestations(t *testing.T)
if err != nil {
t.Error(err)
}
dbAtt, err := service.beaconDB.Attestation(ctx, attDataHash)
if err != nil {
t.Error(err)
}
dbAtt := service.attestationPool[attDataHash]
dbAttBits := dbAtt.AggregationBits.Bytes()
aggregatedBits := att1.AggregationBits.Or(att2.AggregationBits).Bytes()
if !bytes.Equal(dbAttBits, aggregatedBits) {
@@ -428,10 +424,8 @@ func TestHandleAttestation_Skips_PreviouslyAggregatedAttestations(t *testing.T)
if err := service.HandleAttestation(context.Background(), att2); err != nil {
t.Error(err)
}
dbAtt, err = service.beaconDB.Attestation(ctx, attDataHash)
if err != nil {
t.Error(err)
}
dbAtt = service.attestationPool[attDataHash]
dbAttBits = dbAtt.AggregationBits.Bytes()
if !bytes.Equal(dbAttBits, aggregatedBits) {
t.Error("Expected aggregation bits to be equal.")
@@ -444,10 +438,8 @@ func TestHandleAttestation_Skips_PreviouslyAggregatedAttestations(t *testing.T)
if err := service.HandleAttestation(context.Background(), att3); err != nil {
t.Error(err)
}
dbAtt, err = service.beaconDB.Attestation(ctx, attDataHash)
if err != nil {
t.Error(err)
}
dbAtt = service.attestationPool[attDataHash]
dbAttBits = dbAtt.AggregationBits.Bytes()
if !bytes.Equal(dbAttBits, aggregatedBits) {
t.Error("Expected aggregation bits to be equal.")
@@ -463,6 +455,7 @@ func TestRetrieveAttestations_OK(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
defer dbutil.TeardownDB(t, beaconDB)
service := NewService(context.Background(), &Config{BeaconDB: beaconDB})
service.attestationPool = make(map[[32]byte]*ethpb.Attestation)
deposits, privKeys := testutil.SetupInitialDeposits(t, 100)
beaconState, err := state.GenesisBeaconState(deposits, uint64(0), &ethpb.Eth1Data{})
@@ -524,9 +517,9 @@ func TestRetrieveAttestations_OK(t *testing.T) {
}
att.Data.Crosslink.ParentRoot = encoded[:]
att.Data.Crosslink.DataRoot = params.BeaconConfig().ZeroHash[:]
if err := beaconDB.SaveAttestation(context.Background(), att); err != nil {
t.Fatal(err)
}
r, _ := ssz.HashTreeRoot(att.Data)
service.attestationPool[r] = att
headBlockRoot := [32]byte{1, 2, 3}
if err := beaconDB.SaveHeadBlockRoot(context.Background(), headBlockRoot); err != nil {

View File

@@ -36,6 +36,7 @@ go_library(
"//shared/params:go_default_library",
"//shared/trieutil:go_default_library",
"//shared/version:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//recovery:go_default_library",

View File

@@ -3,6 +3,7 @@ package rpc
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
@@ -39,13 +40,14 @@ func (as *AttesterServer) SubmitAttestation(ctx context.Context, att *ethpb.Atte
go func() {
ctx = trace.NewContext(context.Background(), trace.FromContext(ctx))
if err := as.operationsHandler.HandleAttestation(ctx, att); err != nil {
log.WithError(err).Error("could not handle attestation in operations service")
return
}
if err := as.attReceiver.ReceiveAttestation(ctx, att); err != nil {
log.WithError(err).Error("could not receive attestation in chain service")
}
attCopy := proto.Clone(att).(*ethpb.Attestation)
if err := as.operationsHandler.HandleAttestation(ctx, attCopy); err != nil {
log.WithError(err).Error("could not handle attestation in operations service")
return
}
}()
return &pb.AttestResponse{Root: root[:]}, nil