Refactors attestation pool pruning (#6485)

This commit is contained in:
Victor Farazdagi
2020-07-03 19:43:56 +03:00
committed by GitHub
parent c69b3f568e
commit 688f0d7114
4 changed files with 91 additions and 9 deletions

View File

@@ -47,6 +47,7 @@ go_test(
"//shared/bls:go_default_library",
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/runutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",

View File

@@ -7,19 +7,16 @@ import (
"github.com/prysmaticlabs/prysm/shared/roughtime"
)
// Prune expired attestations from the pool every slot interval.
var pruneExpiredAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
// This prunes attestations pool by running pruneExpiredAtts
// at every pruneExpiredAttsPeriod.
// pruneAttsPool prunes attestations pool on every slot interval.
func (s *Service) pruneAttsPool() {
ticker := time.NewTicker(pruneExpiredAttsPeriod)
ticker := time.NewTicker(s.pruneInterval)
for {
select {
case <-ticker.C:
s.pruneExpiredAtts()
case <-s.ctx.Done():
log.Debug("Context closed, exiting routine")
ticker.Stop()
return
}
}

View File

@@ -3,14 +3,87 @@ package attestations
import (
"context"
"testing"
"time"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/prysmaticlabs/prysm/shared/runutil"
)
func TestPruneExpiredAtts_CanPrune(t *testing.T) {
func TestPruneExpired_Ticker(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s, err := NewService(ctx, &Config{
Pool: NewPool(),
pruneInterval: 250 * time.Millisecond,
})
if err != nil {
t.Fatal(err)
}
atts := []*ethpb.Attestation{
{Data: &ethpb.AttestationData{Slot: 0}, AggregationBits: bitfield.Bitlist{0b1000, 0b1}},
{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1000, 0b1}},
}
if err := s.pool.SaveUnaggregatedAttestations(atts); err != nil {
t.Fatal(err)
}
if s.pool.UnaggregatedAttestationCount() != 2 {
t.Fatalf("Unexpected number of attestations: %d", s.pool.UnaggregatedAttestationCount())
}
atts = []*ethpb.Attestation{
{Data: &ethpb.AttestationData{Slot: 0}, AggregationBits: bitfield.Bitlist{0b1101, 0b1}},
{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101, 0b1}},
}
if err := s.pool.SaveAggregatedAttestations(atts); err != nil {
t.Fatal(err)
}
if s.pool.AggregatedAttestationCount() != 2 {
t.Fatalf("Unexpected number of attestations: %d", s.pool.AggregatedAttestationCount())
}
if err := s.pool.SaveBlockAttestations(atts); err != nil {
t.Fatal(err)
}
// Rewind back one epoch worth of time.
s.genesisTime = uint64(roughtime.Now().Unix()) - params.BeaconConfig().SlotsPerEpoch*params.BeaconConfig().SecondsPerSlot
go s.pruneAttsPool()
done := make(chan struct{}, 1)
runutil.RunEvery(ctx, 500*time.Millisecond, func() {
for _, attestation := range s.pool.UnaggregatedAttestations() {
if attestation.Data.Slot == 0 {
return
}
}
for _, attestation := range s.pool.AggregatedAttestations() {
if attestation.Data.Slot == 0 {
return
}
}
for _, attestation := range s.pool.BlockAttestations() {
if attestation.Data.Slot == 0 {
return
}
}
if s.pool.UnaggregatedAttestationCount() != 1 || s.pool.AggregatedAttestationCount() != 1 {
return
}
done <- struct{}{}
})
select {
case <-done:
// All checks are passed.
case <-ctx.Done():
t.Error("Test case takes too long to complete")
}
}
func TestPruneExpired_PruneExpiredAtts(t *testing.T) {
s, err := NewService(context.Background(), &Config{Pool: NewPool()})
if err != nil {
t.Fatal(err)
@@ -45,7 +118,7 @@ func TestPruneExpiredAtts_CanPrune(t *testing.T) {
}
}
func TestExpired_AttsCanExpire(t *testing.T) {
func TestPruneExpired_Expired(t *testing.T) {
s, err := NewService(context.Background(), &Config{Pool: NewPool()})
if err != nil {
t.Fatal(err)

View File

@@ -5,8 +5,10 @@ package attestations
import (
"context"
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/prysmaticlabs/prysm/shared/params"
)
var forkChoiceProcessedRootsSize = 1 << 16
@@ -19,11 +21,13 @@ type Service struct {
err error
forkChoiceProcessedRoots *lru.Cache
genesisTime uint64
pruneInterval time.Duration
}
// Config options for the service.
type Config struct {
Pool Pool
Pool Pool
pruneInterval time.Duration
}
// NewService instantiates a new attestation pool service instance that will
@@ -34,12 +38,19 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
return nil, err
}
pruneInterval := cfg.pruneInterval
if pruneInterval == 0 {
// Prune expired attestations from the pool every slot interval.
pruneInterval = time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
}
ctx, cancel := context.WithCancel(ctx)
return &Service{
ctx: ctx,
cancel: cancel,
pool: cfg.Pool,
forkChoiceProcessedRoots: cache,
pruneInterval: pruneInterval,
}, nil
}