mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Compare commits
2 Commits
devnet-2-b
...
useFixedIn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
47abeb8da1 | ||
|
|
b6f84cc6cb |
@@ -10,7 +10,10 @@ go_library(
|
||||
],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v4/async",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = ["@com_github_sirupsen_logrus//:go_default_library"],
|
||||
deps = [
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
@@ -24,6 +27,7 @@ go_test(
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//config/params:go_default_library",
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -29,3 +30,21 @@ func RunEvery(ctx context.Context, period time.Duration, f func()) {
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func RunWithTickerAndInterval(ctx context.Context, genesis time.Time, intervals []time.Duration, f func()) {
|
||||
funcName := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()
|
||||
ticker := slots.NewSlotTickerWithIntervals(genesis, intervals)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C():
|
||||
log.WithField("function", funcName).Trace("running")
|
||||
f()
|
||||
case <-ctx.Done():
|
||||
log.WithField("function", funcName).Debug("context is closed, exiting")
|
||||
ticker.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/async"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
)
|
||||
|
||||
func TestEveryRuns(t *testing.T) {
|
||||
@@ -38,3 +39,46 @@ func TestEveryRuns(t *testing.T) {
|
||||
t.Error("Counter incremented after stop")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEveryRunsWithTickerAndInterval(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
params.SetupTestConfigCleanup(t)
|
||||
newCfg := params.BeaconConfig()
|
||||
newCfg.SecondsPerSlot = 1
|
||||
params.OverrideBeaconConfig(newCfg)
|
||||
|
||||
i := int32(0)
|
||||
|
||||
genTime := time.Now()
|
||||
async.RunWithTickerAndInterval(ctx, genTime, []time.Duration{100 * time.Millisecond, 200 * time.Millisecond}, func() {
|
||||
atomic.AddInt32(&i, 1)
|
||||
})
|
||||
|
||||
// Sleep for a bit and ensure the value has increased.
|
||||
time.Sleep(1150 * time.Millisecond)
|
||||
|
||||
if atomic.LoadInt32(&i) != 1 {
|
||||
t.Errorf("Counter failed to increment with ticker: Got %d instead of %d", atomic.LoadInt32(&i), 1)
|
||||
}
|
||||
|
||||
// Sleep for a bit and ensure the value has increased.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
if atomic.LoadInt32(&i) != 2 {
|
||||
t.Errorf("Counter failed to increment with ticker: Got %d instead of %d", atomic.LoadInt32(&i), 2)
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
||||
// Sleep for a bit to let the cancel take place.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
last := atomic.LoadInt32(&i)
|
||||
|
||||
// Sleep for a bit and ensure the value has not increased.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
if atomic.LoadInt32(&i) != last {
|
||||
t.Error("Counter incremented after stop")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/prysmaticlabs/prysm/v4/async"
|
||||
@@ -20,14 +21,20 @@ import (
|
||||
)
|
||||
|
||||
// This defines how often a node cleans up and processes pending attestations in the queue.
|
||||
var processPendingAttsPeriod = slots.DivideSlotBy(2 /* twice per slot */)
|
||||
|
||||
var pendingAttsLimit = 10000
|
||||
|
||||
// This processes pending attestation queues on every `processPendingAttsPeriod`.
|
||||
func (s *Service) processPendingAttsQueue() {
|
||||
clock, err := s.clockWaiter.WaitForClock(s.ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("attestation queue failed to receive genesis data")
|
||||
return
|
||||
}
|
||||
// Prevents multiple queue processing goroutines (invoked by RunEvery) from contending for data.
|
||||
mutex := new(sync.Mutex)
|
||||
async.RunEvery(s.ctx, processPendingAttsPeriod, func() {
|
||||
processPendingAttsPeriod := slots.DivideSlotBy(2 /* twice per slot */)
|
||||
async.RunWithTickerAndInterval(s.ctx, clock.GenesisTime(), []time.Duration{0, processPendingAttsPeriod}, func() {
|
||||
mutex.Lock()
|
||||
if err := s.processPendingAtts(s.ctx); err != nil {
|
||||
log.WithError(err).Debugf("Could not process pending attestation: %v", err)
|
||||
|
||||
@@ -27,7 +27,11 @@ import (
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
var processPendingBlocksPeriod = slots.DivideSlotBy(3 /* times per slot */)
|
||||
const (
|
||||
firstIntervalPercentage = 17
|
||||
secondIntervalPercentage = 42
|
||||
thirdIntervalPercentage = 92
|
||||
)
|
||||
|
||||
const maxPeerRequest = 50
|
||||
const numOfTries = 5
|
||||
@@ -35,9 +39,15 @@ const maxBlocksPerSlot = 3
|
||||
|
||||
// processes pending blocks queue on every processPendingBlocksPeriod
|
||||
func (s *Service) processPendingBlocksQueue() {
|
||||
clock, err := s.clockWaiter.WaitForClock(s.ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("attestation queue failed to receive genesis data")
|
||||
return
|
||||
}
|
||||
// Prevents multiple queue processing goroutines (invoked by RunEvery) from contending for data.
|
||||
locker := new(sync.Mutex)
|
||||
async.RunEvery(s.ctx, processPendingBlocksPeriod, func() {
|
||||
slotTimeMultiplier := (time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second) / 100
|
||||
async.RunWithTickerAndInterval(s.ctx, clock.GenesisTime(), []time.Duration{firstIntervalPercentage * slotTimeMultiplier, secondIntervalPercentage * slotTimeMultiplier, thirdIntervalPercentage * slotTimeMultiplier}, func() {
|
||||
// Don't process the pending blocks if genesis time has not been set. The chain is not ready.
|
||||
if !s.chainIsStarted() {
|
||||
return
|
||||
|
||||
@@ -212,8 +212,8 @@ func (s *Service) Start() {
|
||||
return nil
|
||||
})
|
||||
s.cfg.p2p.AddPingMethod(s.sendPingRequest)
|
||||
s.processPendingBlocksQueue()
|
||||
s.processPendingAttsQueue()
|
||||
go s.processPendingBlocksQueue()
|
||||
go s.processPendingAttsQueue()
|
||||
s.maintainPeerStatuses()
|
||||
s.resyncIfBehind()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user