Compare commits

...

2 Commits

Author SHA1 Message Date
nisdas
47abeb8da1 fix panics 2023-11-28 15:50:01 +08:00
nisdas
b6f84cc6cb use it 2023-11-28 15:20:12 +08:00
6 changed files with 91 additions and 7 deletions

View File

@@ -10,7 +10,10 @@ go_library(
],
importpath = "github.com/prysmaticlabs/prysm/v4/async",
visibility = ["//visibility:public"],
deps = ["@com_github_sirupsen_logrus//:go_default_library"],
deps = [
"//time/slots:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
go_test(
@@ -24,6 +27,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//config/params:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"runtime"
"time"
"github.com/prysmaticlabs/prysm/v4/time/slots"
log "github.com/sirupsen/logrus"
)
@@ -29,3 +30,21 @@ func RunEvery(ctx context.Context, period time.Duration, f func()) {
}
}()
}
func RunWithTickerAndInterval(ctx context.Context, genesis time.Time, intervals []time.Duration, f func()) {
funcName := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()
ticker := slots.NewSlotTickerWithIntervals(genesis, intervals)
go func() {
for {
select {
case <-ticker.C():
log.WithField("function", funcName).Trace("running")
f()
case <-ctx.Done():
log.WithField("function", funcName).Debug("context is closed, exiting")
ticker.Done()
return
}
}
}()
}

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/prysmaticlabs/prysm/v4/async"
"github.com/prysmaticlabs/prysm/v4/config/params"
)
func TestEveryRuns(t *testing.T) {
@@ -38,3 +39,46 @@ func TestEveryRuns(t *testing.T) {
t.Error("Counter incremented after stop")
}
}
func TestEveryRunsWithTickerAndInterval(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
params.SetupTestConfigCleanup(t)
newCfg := params.BeaconConfig()
newCfg.SecondsPerSlot = 1
params.OverrideBeaconConfig(newCfg)
i := int32(0)
genTime := time.Now()
async.RunWithTickerAndInterval(ctx, genTime, []time.Duration{100 * time.Millisecond, 200 * time.Millisecond}, func() {
atomic.AddInt32(&i, 1)
})
// Sleep for a bit and ensure the value has increased.
time.Sleep(1150 * time.Millisecond)
if atomic.LoadInt32(&i) != 1 {
t.Errorf("Counter failed to increment with ticker: Got %d instead of %d", atomic.LoadInt32(&i), 1)
}
// Sleep for a bit and ensure the value has increased.
time.Sleep(200 * time.Millisecond)
if atomic.LoadInt32(&i) != 2 {
t.Errorf("Counter failed to increment with ticker: Got %d instead of %d", atomic.LoadInt32(&i), 2)
}
cancel()
// Sleep for a bit to let the cancel take place.
time.Sleep(100 * time.Millisecond)
last := atomic.LoadInt32(&i)
// Sleep for a bit and ensure the value has not increased.
time.Sleep(200 * time.Millisecond)
if atomic.LoadInt32(&i) != last {
t.Error("Counter incremented after stop")
}
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"sync"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/prysmaticlabs/prysm/v4/async"
@@ -20,14 +21,20 @@ import (
)
// This defines how often a node cleans up and processes pending attestations in the queue.
var processPendingAttsPeriod = slots.DivideSlotBy(2 /* twice per slot */)
var pendingAttsLimit = 10000
// This processes pending attestation queues on every `processPendingAttsPeriod`.
func (s *Service) processPendingAttsQueue() {
clock, err := s.clockWaiter.WaitForClock(s.ctx)
if err != nil {
log.WithError(err).Error("attestation queue failed to receive genesis data")
return
}
// Prevents multiple queue processing goroutines (invoked by RunEvery) from contending for data.
mutex := new(sync.Mutex)
async.RunEvery(s.ctx, processPendingAttsPeriod, func() {
processPendingAttsPeriod := slots.DivideSlotBy(2 /* twice per slot */)
async.RunWithTickerAndInterval(s.ctx, clock.GenesisTime(), []time.Duration{0, processPendingAttsPeriod}, func() {
mutex.Lock()
if err := s.processPendingAtts(s.ctx); err != nil {
log.WithError(err).Debugf("Could not process pending attestation: %v", err)

View File

@@ -27,7 +27,11 @@ import (
"go.opencensus.io/trace"
)
var processPendingBlocksPeriod = slots.DivideSlotBy(3 /* times per slot */)
const (
firstIntervalPercentage = 17
secondIntervalPercentage = 42
thirdIntervalPercentage = 92
)
const maxPeerRequest = 50
const numOfTries = 5
@@ -35,9 +39,15 @@ const maxBlocksPerSlot = 3
// processes pending blocks queue on every processPendingBlocksPeriod
func (s *Service) processPendingBlocksQueue() {
clock, err := s.clockWaiter.WaitForClock(s.ctx)
if err != nil {
log.WithError(err).Error("attestation queue failed to receive genesis data")
return
}
// Prevents multiple queue processing goroutines (invoked by RunEvery) from contending for data.
locker := new(sync.Mutex)
async.RunEvery(s.ctx, processPendingBlocksPeriod, func() {
slotTimeMultiplier := (time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second) / 100
async.RunWithTickerAndInterval(s.ctx, clock.GenesisTime(), []time.Duration{firstIntervalPercentage * slotTimeMultiplier, secondIntervalPercentage * slotTimeMultiplier, thirdIntervalPercentage * slotTimeMultiplier}, func() {
// Don't process the pending blocks if genesis time has not been set. The chain is not ready.
if !s.chainIsStarted() {
return

View File

@@ -212,8 +212,8 @@ func (s *Service) Start() {
return nil
})
s.cfg.p2p.AddPingMethod(s.sendPingRequest)
s.processPendingBlocksQueue()
s.processPendingAttsQueue()
go s.processPendingBlocksQueue()
go s.processPendingAttsQueue()
s.maintainPeerStatuses()
s.resyncIfBehind()