Fix unclosed tickers/timers (#7190)

* fix resource leak
* fixes leak in blocks fetcher
* client/validator release ticker resorces
* powchain, more straightforward ticker closing
* adds missing ticker.stop() calls
* more straightforward ticker closing
* Merge refs/heads/master into fix-unclosed-tickers-timers
* Merge refs/heads/master into fix-unclosed-tickers-timers
* Merge refs/heads/master into fix-unclosed-tickers-timers
* gofmt issues introduced in https://github.com/prysmaticlabs/prysm/pull/7176
This commit is contained in:
Victor Farazdagi
2020-09-08 21:05:38 +03:00
committed by GitHub
parent f4848e46d4
commit 8baa22f065
10 changed files with 12 additions and 9 deletions

View File

@@ -22,6 +22,7 @@ var prepareForkChoiceAttsPeriod = slotutil.DivideSlotBy(3 /* times-per-slot */)
// every prepareForkChoiceAttsPeriod.
func (s *Service) prepareForkChoiceAtts() {
ticker := time.NewTicker(prepareForkChoiceAttsPeriod)
defer ticker.Stop()
for {
ctx := context.Background()
select {

View File

@@ -10,6 +10,7 @@ import (
// pruneAttsPool prunes attestations pool on every slot interval.
func (s *Service) pruneAttsPool() {
ticker := time.NewTicker(s.pruneInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
@@ -17,7 +18,6 @@ func (s *Service) pruneAttsPool() {
s.updateMetrics()
case <-s.ctx.Done():
log.Debug("Context closed, exiting routine")
ticker.Stop()
return
}
}

View File

@@ -245,6 +245,7 @@ func TestPeerScorer_PeerScorerManager_loop(t *testing.T) {
done <- struct{}{}
}()
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:

View File

@@ -442,6 +442,7 @@ func (s *Service) waitForConnection() {
logCounter++
}
ticker := time.NewTicker(backOffPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
@@ -460,12 +461,10 @@ func (s *Service) waitForConnection() {
log.WithFields(logrus.Fields{
"endpoint": s.httpEndpoint,
}).Info("Connected to eth1 proof-of-work chain")
ticker.Stop()
return
}
log.Debug("Eth1 node is currently syncing")
case <-s.ctx.Done():
ticker.Stop()
log.Debug("Received cancelled context,closing existing powchain service")
return
}

View File

@@ -179,12 +179,12 @@ func (f *blocksFetcher) loop() {
// Periodically remove stale peer locks.
go func() {
ticker := time.NewTicker(peerLocksPollingInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
f.removeStalePeerLocks(peerLockMaxAge)
case <-f.ctx.Done():
ticker.Stop()
return
}
}
@@ -338,9 +338,9 @@ func (f *blocksFetcher) requestBlocks(
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
log.WithField("peer", pid).Debug("Slowing down for rate limit")
timer := time.NewTimer(f.rateLimiter.TillEmpty(pid.String()))
defer timer.Stop()
select {
case <-f.ctx.Done():
timer.Stop()
return nil, errFetcherCtxIsDone
case <-timer.C:
// Peer has gathered enough capacity to be polled again.

View File

@@ -169,6 +169,7 @@ func (q *blocksQueue) loop() {
}
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for {
// Check highest expected slot when we approach chain's head slot.
if q.headFetcher.HeadSlot() >= q.highestExpectedSlot {
@@ -248,7 +249,6 @@ func (q *blocksQueue) loop() {
}
case <-q.ctx.Done():
log.Debug("Context closed, exiting goroutine (blocks queue)")
ticker.Stop()
return
}
}

View File

@@ -8,8 +8,8 @@ import (
"path/filepath"
"strings"
log "github.com/sirupsen/logrus"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
// ExpandPath given a string which may be a relative path.

View File

@@ -60,6 +60,7 @@ func (bs *Service) querySyncStatus(ctx context.Context) {
return
}
ticker := time.NewTicker(syncStatusPollingInterval)
defer ticker.Stop()
log.Info("Waiting for beacon node to be fully synced...")
for {
select {

View File

@@ -149,6 +149,7 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) {
var atts []*ethpb.IndexedAttestation
halfSlot := slotutil.DivideSlotBy(2 /* 1/2 slot duration */)
ticker := time.NewTicker(halfSlot)
defer ticker.Stop()
for {
select {
case <-ticker.C:
@@ -185,6 +186,7 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) {
func (bs *Service) restartBeaconConnection(ctx context.Context) error {
ticker := time.NewTicker(reconnectPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
@@ -203,12 +205,10 @@ func (bs *Service) restartBeaconConnection(ctx context.Context) error {
continue
}
log.Info("Beacon node is fully synced")
return nil
case <-ctx.Done():
log.Debug("Context closed, exiting reconnect routine")
return errors.New("context closed, no longer attempting to restart stream")
}
}
}

View File

@@ -216,6 +216,7 @@ func (v *validator) SlasherReady(ctx context.Context) error {
return nil
}
ticker := time.NewTicker(reconnectPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C: