From ab2d4e8ad6b5d0875e27fcc18d50df783fe48ce9 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 23 Sep 2019 09:11:44 -0700 Subject: [PATCH] Implement attestation pool in memory (#3542) * Run time bug * Still failing * Run time working * Run time working * Gazelle * Fixed all the tests * Revert config * Revert back test configs * Revert config * Tested run time again, everything is good --- .../forkchoice/process_attestation.go | 4 +- .../blockchain/forkchoice/process_block.go | 8 +- .../blockchain/receive_attestation.go | 2 +- beacon-chain/operations/service.go | 78 +++++++++---------- beacon-chain/operations/service_test.go | 35 ++++----- beacon-chain/rpc/BUILD.bazel | 1 + beacon-chain/rpc/attester_server.go | 10 ++- 7 files changed, 69 insertions(+), 69 deletions(-) diff --git a/beacon-chain/blockchain/forkchoice/process_attestation.go b/beacon-chain/blockchain/forkchoice/process_attestation.go index 833a5de2d9..09fc7dc4e8 100644 --- a/beacon-chain/blockchain/forkchoice/process_attestation.go +++ b/beacon-chain/blockchain/forkchoice/process_attestation.go @@ -59,7 +59,7 @@ func (s *Store) OnAttestation(ctx context.Context, a *ethpb.Attestation) (uint64 ctx, span := trace.StartSpan(ctx, "forkchoice.onAttestation") defer span.End() - tgt := a.Data.Target + tgt := proto.Clone(a.Data.Target).(*ethpb.Checkpoint) tgtSlot := helpers.StartSlot(tgt.Epoch) // Verify beacon node has seen the target block before. @@ -257,7 +257,7 @@ func (s *Store) updateAttVotes( if err != nil { return errors.Wrapf(err, "could not get latest vote for validator %d", i) } - if !s.db.HasValidatorLatestVote(ctx, i) || tgtEpoch > vote.Epoch { + if vote == nil || tgtEpoch > vote.Epoch { if err := s.db.SaveValidatorLatestVote(ctx, i, &pb.ValidatorLatestVote{ Epoch: tgtEpoch, Root: tgtRoot, diff --git a/beacon-chain/blockchain/forkchoice/process_block.go b/beacon-chain/blockchain/forkchoice/process_block.go index 600cdca0bf..fbf9da8df6 100644 --- a/beacon-chain/blockchain/forkchoice/process_block.go +++ b/beacon-chain/blockchain/forkchoice/process_block.go @@ -169,6 +169,12 @@ func (s *Store) updateBlockAttestationVote(ctx context.Context, att *ethpb.Attes if err != nil { return errors.Wrap(err, "could not get state for attestation tgt root") } + if err := s.waitForAttInclDelay(ctx, att, baseState); err != nil { + return errors.Wrap(err, "could not wait for attestation inclusion delay") + } + if err := s.verifyAttSlotTime(ctx, baseState, att.Data); err != nil { + return errors.Wrap(err, "could not verify attestation slot time") + } indexedAtt, err := blocks.ConvertToIndexed(baseState, att) if err != nil { return errors.Wrap(err, "could not convert attestation to indexed attestation") @@ -178,7 +184,7 @@ func (s *Store) updateBlockAttestationVote(ctx context.Context, att *ethpb.Attes if err != nil { return errors.Wrapf(err, "could not get latest vote for validator %d", i) } - if !s.db.HasValidatorLatestVote(ctx, i) || tgt.Epoch > vote.Epoch { + if vote == nil || tgt.Epoch > vote.Epoch { if err := s.db.SaveValidatorLatestVote(ctx, i, &pb.ValidatorLatestVote{ Epoch: tgt.Epoch, Root: tgt.Root, diff --git a/beacon-chain/blockchain/receive_attestation.go b/beacon-chain/blockchain/receive_attestation.go index 2f7c3d208f..dbbe010326 100644 --- a/beacon-chain/blockchain/receive_attestation.go +++ b/beacon-chain/blockchain/receive_attestation.go @@ -35,7 +35,7 @@ func (s *Service) ReceiveAttestation(ctx context.Context, att *ethpb.Attestation return errors.Wrap(err, "could not broadcast attestation") } - attRoot, err := ssz.HashTreeRoot(att) + attRoot, err := ssz.HashTreeRoot(att.Data) if err != nil { log.WithError(err).Error("Failed to hash attestation") } diff --git a/beacon-chain/operations/service.go b/beacon-chain/operations/service.go index 34e0ddc890..edf584d982 100644 --- a/beacon-chain/operations/service.go +++ b/beacon-chain/operations/service.go @@ -4,7 +4,6 @@ package operations import ( "context" "fmt" - "sort" "sync" "time" @@ -54,6 +53,8 @@ type Service struct { incomingProcessedBlockFeed *event.Feed incomingProcessedBlock chan *ethpb.BeaconBlock error error + attestationPool map[[32]byte]*ethpb.Attestation + attestationPoolLock sync.Mutex attestationLockCache *ccache.Cache } @@ -72,6 +73,7 @@ func NewService(ctx context.Context, cfg *Config) *Service { beaconDB: cfg.BeaconDB, incomingProcessedBlockFeed: new(event.Feed), incomingProcessedBlock: make(chan *ethpb.BeaconBlock, params.BeaconConfig().DefaultBufferSize), + attestationPool: make(map[[32]byte]*ethpb.Attestation), attestationLockCache: ccache.New(ccache.Configure()), } } @@ -125,11 +127,11 @@ func (s *Service) retrieveLock(key [32]byte) *sync.Mutex { // the attestations are returned in slot ascending order and up to MaxAttestations // capacity. The attestations get deleted in DB after they have been retrieved. func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([]*ethpb.Attestation, error) { - var attestations []*ethpb.Attestation - atts, err := s.beaconDB.Attestations(ctx, nil /*filter*/) - if err != nil { - return nil, errors.New("could not retrieve attestations from DB") - } + s.attestationPoolLock.Lock() + defer s.attestationPoolLock.Unlock() + + atts := make([]*ethpb.Attestation, 0, len(s.attestationPool)) + bState, err := s.beaconDB.HeadState(ctx) if err != nil { return nil, errors.New("could not retrieve attestations from DB") @@ -142,20 +144,15 @@ func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([] } } - sort.Slice(atts, func(i, j int) bool { - return atts[i].Data.Target.Epoch < atts[j].Data.Target.Epoch - }) - var validAttsCount uint64 - for _, att := range atts { + for _, att := range s.attestationPool { + root, err := ssz.HashTreeRoot(att.Data) + if err != nil { + return nil, err + } + if _, err = blocks.ProcessAttestation(bState, att); err != nil { - hash, err := ssz.HashTreeRoot(att) - if err != nil { - return nil, err - } - if err := s.beaconDB.DeleteAttestation(ctx, hash); err != nil { - return nil, err - } + delete(s.attestationPool, root) continue } @@ -164,9 +161,10 @@ func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([] if validAttsCount == params.BeaconConfig().MaxAttestations { break } - attestations = append(attestations, att) + + atts = append(atts, att) } - return attestations, nil + return atts, nil } // HandleValidatorExits processes a validator exit operation. @@ -188,6 +186,9 @@ func (s *Service) HandleValidatorExits(ctx context.Context, message proto.Messag // HandleAttestation processes a received attestation message. func (s *Service) HandleAttestation(ctx context.Context, message proto.Message) error { + s.attestationPoolLock.Lock() + defer s.attestationPoolLock.Unlock() + ctx, span := trace.StartSpan(ctx, "operations.HandleAttestation") defer span.End() @@ -201,23 +202,18 @@ func (s *Service) HandleAttestation(ctx context.Context, message proto.Message) lock.Lock() defer lock.Unlock() - if s.beaconDB.HasAttestation(ctx, root) { - dbAtt, err := s.beaconDB.Attestation(ctx, root) - if err != nil { - return err - } - attestation, err = helpers.AggregateAttestation(dbAtt, attestation) - if err != nil { - return err - } - if err := s.beaconDB.SaveAttestation(ctx, attestation); err != nil { - return err - } - } else { - if err := s.beaconDB.SaveAttestation(ctx, attestation); err != nil { - return err - } + savedAtt, ok := s.attestationPool[root] + if !ok { + s.attestationPool[root] = attestation + return nil } + + savedAtt, err = helpers.AggregateAttestation(savedAtt, attestation) + if err != nil { + return err + } + s.attestationPool[root] = savedAtt + return nil } @@ -253,16 +249,18 @@ func (s *Service) handleProcessedBlock(ctx context.Context, message proto.Messag // removeAttestationsFromPool removes a list of attestations from the DB // after they have been included in a beacon block. func (s *Service) removeAttestationsFromPool(ctx context.Context, attestations []*ethpb.Attestation) error { + s.attestationPoolLock.Lock() + defer s.attestationPoolLock.Unlock() + for _, attestation := range attestations { root, err := ssz.HashTreeRoot(attestation.Data) if err != nil { return err } - if s.beaconDB.HasAttestation(ctx, root) { - if err := s.beaconDB.DeleteAttestation(ctx, root); err != nil { - return err - } + _, ok := s.attestationPool[root] + if ok { + delete(s.attestationPool, root) log.WithField("root", fmt.Sprintf("%#x", root)).Debug("Attestation removed from pool") } } diff --git a/beacon-chain/operations/service_test.go b/beacon-chain/operations/service_test.go index 13492c6cee..a925d72a3e 100644 --- a/beacon-chain/operations/service_test.go +++ b/beacon-chain/operations/service_test.go @@ -168,6 +168,7 @@ func TestHandleAttestation_Aggregates_LargeNumValidators(t *testing.T) { opsSrv := NewService(ctx, &Config{ BeaconDB: beaconDB, }) + opsSrv.attestationPool = make(map[[32]byte]*ethpb.Attestation) // First, we create a common attestation data. data := ðpb.AttestationData{ @@ -254,12 +255,9 @@ func TestHandleAttestation_Aggregates_LargeNumValidators(t *testing.T) { } wg.Wait() - // We fetch the final attestation from the DB, which should be an aggregation of + // We fetch the final attestation from the attestation pool, which should be an aggregation of // all committee members effectively. - aggAtt, err := beaconDB.Attestation(ctx, attDataRoot) - if err != nil { - t.Error(err) - } + aggAtt := opsSrv.attestationPool[attDataRoot] b1 := aggAtt.AggregationBits.Bytes() b2 := totalAggBits.Bytes() @@ -278,11 +276,11 @@ func TestHandleAttestation_Aggregates_LargeNumValidators(t *testing.T) { func TestHandleAttestation_Skips_PreviouslyAggregatedAttestations(t *testing.T) { beaconDB := dbutil.SetupDB(t) defer dbutil.TeardownDB(t, beaconDB) - ctx := context.Background() helpers.ClearAllCaches() service := NewService(context.Background(), &Config{ BeaconDB: beaconDB, }) + service.attestationPool = make(map[[32]byte]*ethpb.Attestation) deposits, privKeys := testutil.SetupInitialDeposits(t, 200) beaconState, err := state.GenesisBeaconState(deposits, uint64(0), ðpb.Eth1Data{}) @@ -411,10 +409,8 @@ func TestHandleAttestation_Skips_PreviouslyAggregatedAttestations(t *testing.T) if err != nil { t.Error(err) } - dbAtt, err := service.beaconDB.Attestation(ctx, attDataHash) - if err != nil { - t.Error(err) - } + dbAtt := service.attestationPool[attDataHash] + dbAttBits := dbAtt.AggregationBits.Bytes() aggregatedBits := att1.AggregationBits.Or(att2.AggregationBits).Bytes() if !bytes.Equal(dbAttBits, aggregatedBits) { @@ -428,10 +424,8 @@ func TestHandleAttestation_Skips_PreviouslyAggregatedAttestations(t *testing.T) if err := service.HandleAttestation(context.Background(), att2); err != nil { t.Error(err) } - dbAtt, err = service.beaconDB.Attestation(ctx, attDataHash) - if err != nil { - t.Error(err) - } + dbAtt = service.attestationPool[attDataHash] + dbAttBits = dbAtt.AggregationBits.Bytes() if !bytes.Equal(dbAttBits, aggregatedBits) { t.Error("Expected aggregation bits to be equal.") @@ -444,10 +438,8 @@ func TestHandleAttestation_Skips_PreviouslyAggregatedAttestations(t *testing.T) if err := service.HandleAttestation(context.Background(), att3); err != nil { t.Error(err) } - dbAtt, err = service.beaconDB.Attestation(ctx, attDataHash) - if err != nil { - t.Error(err) - } + dbAtt = service.attestationPool[attDataHash] + dbAttBits = dbAtt.AggregationBits.Bytes() if !bytes.Equal(dbAttBits, aggregatedBits) { t.Error("Expected aggregation bits to be equal.") @@ -463,6 +455,7 @@ func TestRetrieveAttestations_OK(t *testing.T) { beaconDB := dbutil.SetupDB(t) defer dbutil.TeardownDB(t, beaconDB) service := NewService(context.Background(), &Config{BeaconDB: beaconDB}) + service.attestationPool = make(map[[32]byte]*ethpb.Attestation) deposits, privKeys := testutil.SetupInitialDeposits(t, 100) beaconState, err := state.GenesisBeaconState(deposits, uint64(0), ðpb.Eth1Data{}) @@ -524,9 +517,9 @@ func TestRetrieveAttestations_OK(t *testing.T) { } att.Data.Crosslink.ParentRoot = encoded[:] att.Data.Crosslink.DataRoot = params.BeaconConfig().ZeroHash[:] - if err := beaconDB.SaveAttestation(context.Background(), att); err != nil { - t.Fatal(err) - } + + r, _ := ssz.HashTreeRoot(att.Data) + service.attestationPool[r] = att headBlockRoot := [32]byte{1, 2, 3} if err := beaconDB.SaveHeadBlockRoot(context.Background(), headBlockRoot); err != nil { diff --git a/beacon-chain/rpc/BUILD.bazel b/beacon-chain/rpc/BUILD.bazel index f4022fbd0a..abb3d352ad 100644 --- a/beacon-chain/rpc/BUILD.bazel +++ b/beacon-chain/rpc/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//shared/params:go_default_library", "//shared/trieutil:go_default_library", "//shared/version:go_default_library", + "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", "@com_github_grpc_ecosystem_go_grpc_middleware//:go_default_library", "@com_github_grpc_ecosystem_go_grpc_middleware//recovery:go_default_library", diff --git a/beacon-chain/rpc/attester_server.go b/beacon-chain/rpc/attester_server.go index 1912f1036d..a6525a4b61 100644 --- a/beacon-chain/rpc/attester_server.go +++ b/beacon-chain/rpc/attester_server.go @@ -3,6 +3,7 @@ package rpc import ( "context" + "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" @@ -39,13 +40,14 @@ func (as *AttesterServer) SubmitAttestation(ctx context.Context, att *ethpb.Atte go func() { ctx = trace.NewContext(context.Background(), trace.FromContext(ctx)) - if err := as.operationsHandler.HandleAttestation(ctx, att); err != nil { - log.WithError(err).Error("could not handle attestation in operations service") - return - } if err := as.attReceiver.ReceiveAttestation(ctx, att); err != nil { log.WithError(err).Error("could not receive attestation in chain service") } + attCopy := proto.Clone(att).(*ethpb.Attestation) + if err := as.operationsHandler.HandleAttestation(ctx, attCopy); err != nil { + log.WithError(err).Error("could not handle attestation in operations service") + return + } }() return &pb.AttestResponse{Root: root[:]}, nil