From 6f8349cdb45bb3002e4ef6beb9cc0720fcdcfadd Mon Sep 17 00:00:00 2001 From: Victor Farazdagi Date: Mon, 22 Jun 2020 14:23:23 +0300 Subject: [PATCH] Reclaims leakybucket resources in sync service (#6339) * Reclaims leakybucket resources * move calls to defer * do not reuse queue, after stopping --- beacon-chain/sync/initial-sync/blocks_fetcher.go | 6 ++++++ beacon-chain/sync/initial-sync/round_robin.go | 8 ++++++-- beacon-chain/sync/service.go | 6 ++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index e8dde8be5e..c2af45e0e6 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -132,6 +132,12 @@ func (f *blocksFetcher) start() error { // stop terminates all fetcher operations. func (f *blocksFetcher) stop() { + defer func() { + if f.rateLimiter != nil { + f.rateLimiter.Free() + f.rateLimiter = nil + } + }() f.cancel() <-f.quit // make sure that loop() is done } diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 16d1ffdb58..ad8bcf8074 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -80,6 +80,10 @@ func (s *Service) roundRobinSync(genesis time.Time) error { // mitigation. We are already convinced that we are on the correct finalized chain. Any blocks // we receive there after must build on the finalized chain or be considered invalid during // fork choice resolution / block processing. + blocksFetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ + p2p: s.p2p, + headFetcher: s.chain, + }) _, _, pids := s.p2p.Peers().BestFinalized(1 /* maxPeers */, s.highestFinalizedEpoch()) for len(pids) == 0 { log.Info("Waiting for a suitable peer before syncing to the head of the chain") @@ -90,7 +94,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error { for head := helpers.SlotsSince(genesis); s.chain.HeadSlot() < head; { count := mathutil.Min( - helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, queue.blocksFetcher.blocksPerSecond) + helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, blocksFetcher.blocksPerSecond) req := &p2ppb.BeaconBlocksByRangeRequest{ StartSlot: s.chain.HeadSlot() + 1, Count: count, @@ -100,7 +104,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error { "req": req, "peer": best.Pretty(), }).Debug("Sending batch block request") - resp, err := queue.blocksFetcher.requestBlocks(ctx, req, best) + resp, err := blocksFetcher.requestBlocks(ctx, req, best) if err != nil { log.WithError(err).Error("Failed to receive blocks, exiting init sync") return nil diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index b3786ffd24..1122f7ff3d 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -159,6 +159,12 @@ func (r *Service) Start() { // Stop the regular sync service. func (r *Service) Stop() error { + defer func() { + if r.blocksRateLimiter != nil { + r.blocksRateLimiter.Free() + r.blocksRateLimiter = nil + } + }() defer r.cancel() return nil }