Do not panic if initial sync fails (#4477)

* Do not panic if initial sync fails
* Only consider peers with non-0 finalized epoch
* Additional fixes
* Fix tests
* Merge branch 'master' into mvlog
* Merge branch 'master' into mvlog
This commit is contained in:
Jim McDonald
2020-01-10 21:36:20 +00:00
committed by prylabs-bulldozer[bot]
parent 37459ee765
commit 22a3bf53ad
5 changed files with 34 additions and 32 deletions

View File

@@ -317,17 +317,16 @@ func (p *Status) Decay() {
}
}
// BestFinalized returns the highest finalized epoch that is agreed upon by the majority of
// peers. This method may not return the absolute highest finalized, but the finalized epoch in
// which most peers can serve blocks. Ideally, all peers would be reporting the same finalized
// epoch.
// Returns the best finalized root, epoch number, and peers that agree.
func (p *Status) BestFinalized(maxPeers int) ([]byte, uint64, []peer.ID) {
// BestFinalized returns the highest finalized epoch equal to or higher than ours that is agreed upon by the majority of peers.
// This method may not return the absolute highest finalized, but the finalized epoch in which most peers can serve blocks.
// Ideally, all peers would be reporting the same finalized epoch.
// Returns the best finalized root, epoch number, and list of peers that agree.
func (p *Status) BestFinalized(maxPeers int, ourFinalizedEpoch uint64) ([]byte, uint64, []peer.ID) {
finalized := make(map[[32]byte]uint64)
rootToEpoch := make(map[[32]byte]uint64)
for _, pid := range p.Connected() {
peerChainState, err := p.ChainState(pid)
if err == nil && peerChainState != nil {
if err == nil && peerChainState != nil && peerChainState.FinalizedEpoch >= ourFinalizedEpoch {
r := bytesutil.ToBytes32(peerChainState.FinalizedRoot)
finalized[r]++
rootToEpoch[r] = peerChainState.FinalizedEpoch

View File

@@ -378,7 +378,7 @@ func TestBestPeer(t *testing.T) {
FinalizedEpoch: 3,
FinalizedRoot: junkRoot[:],
})
retRoot, retEpoch, _ := p.BestFinalized(15)
retRoot, retEpoch, _ := p.BestFinalized(15, 0)
if !bytes.Equal(retRoot, expectedRoot[:]) {
t.Errorf("Incorrect Finalized Root retrieved; wanted %v but got %v", expectedRoot, retRoot)
}
@@ -400,7 +400,7 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) {
})
}
_, _, pids := p.BestFinalized(maxPeers)
_, _, pids := p.BestFinalized(maxPeers, 0)
if len(pids) != maxPeers {
t.Fatalf("returned wrong number of peers, wanted %d, got %d", maxPeers, len(pids))
}

View File

@@ -46,7 +46,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
var lastEmptyRequests int
// Step 1 - Sync to end of finalized epoch.
for s.chain.HeadSlot() < helpers.StartSlot(s.highestFinalizedEpoch()+1) {
root, finalizedEpoch, peers := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
root, finalizedEpoch, peers := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, s.chain.HeadSlot()/params.BeaconConfig().SlotsPerEpoch)
if len(peers) == 0 {
log.Warn("No peers; waiting for reconnect")
time.Sleep(refreshTime)
@@ -120,7 +120,6 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
}()
resp, err := s.requestBlocks(ctx, req, pid)
log.WithField("peer", pid.Pretty()).Debugf("Received %d blocks", len(resp))
if err != nil {
// fail over to other peers by splitting this requests evenly across them.
ps := append(peers[:i], peers[i+1:]...)
@@ -141,6 +140,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
return
}
}
log.WithField("peer", pid).WithField("count", len(resp)).Debug("Received blocks")
blocksChan <- resp
}(i, pid)
}
@@ -225,13 +225,13 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
// we receive there after must build on the finalized chain or be considered invalid during
// fork choice resolution / block processing.
best := s.bestPeer()
root, _, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
root, _, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, s.chain.HeadSlot()/params.BeaconConfig().SlotsPerEpoch)
// if no best peer exists, retry until a new best peer is found.
for len(best) == 0 {
time.Sleep(refreshTime)
best = s.bestPeer()
root, _, _ = s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
root, _, _ = s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, s.chain.HeadSlot()/params.BeaconConfig().SlotsPerEpoch)
}
for head := helpers.SlotsSince(genesis); s.chain.HeadSlot() < head; {
req := &p2ppb.BeaconBlocksByRangeRequest{
@@ -297,7 +297,7 @@ func (s *Service) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocksByRa
// highestFinalizedEpoch as reported by peers. This is the absolute highest finalized epoch as
// reported by peers.
func (s *Service) highestFinalizedEpoch() uint64 {
_, epoch, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
_, epoch, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, s.chain.HeadSlot()/params.BeaconConfig().SlotsPerEpoch)
return epoch
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/sirupsen/logrus"
)
@@ -114,12 +115,10 @@ func (s *Service) Start() {
return
}
s.waitForMinimumPeers()
if err := s.roundRobinSync(genesis); err != nil {
panic(err)
if err := s.roundRobinSync(genesis); err == nil {
log.Infof("Synced up to slot %d", s.chain.HeadSlot())
s.synced = true
}
log.Infof("Synced up to slot %d", s.chain.HeadSlot())
s.synced = true
}
// Stop initial sync.
@@ -152,25 +151,30 @@ func (s *Service) Resync() error {
genesis := time.Unix(int64(headState.GenesisTime), 0)
s.waitForMinimumPeers()
if err := s.roundRobinSync(genesis); err != nil {
return errors.Wrap(err, "could not retrieve head state")
err = s.roundRobinSync(genesis)
if err == nil {
s.synced = true
} else {
log = log.WithError(err)
}
log.Infof("Synced up to slot %d", s.chain.HeadSlot())
log.WithField("synced", s.synced).WithField("slot", s.chain.HeadSlot()).Info("Resync attempt complete")
s.synced = true
return nil
}
func (s *Service) waitForMinimumPeers() {
// Every 5 sec, report handshake count.
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
for {
count := len(s.p2p.Peers().Connected())
if count >= flags.Get().MinimumSyncPeers {
_, _, peers := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, s.chain.HeadSlot()/params.BeaconConfig().SlotsPerEpoch)
if len(peers) >= required {
break
}
log.WithFields(logrus.Fields{
"valid handshakes": count,
"required handshakes": flags.Get().MinimumSyncPeers}).Info("Waiting for enough peer handshakes before syncing")
"suitable": len(peers),
"required": required}).Info("Waiting for enough suitable peers before syncing")
time.Sleep(handshakePollingInterval)
}
}

View File

@@ -36,14 +36,13 @@ func (r *Service) maintainPeerStatuses() {
}
}
}
if !r.initialSync.Syncing() {
_, highestEpoch, _ := r.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
for !r.initialSync.Syncing() {
_, highestEpoch, _ := r.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, r.chain.HeadSlot()/params.BeaconConfig().SlotsPerEpoch)
if helpers.StartSlot(highestEpoch) > r.chain.HeadSlot() {
numberOfTimesResyncedCounter.Inc()
r.clearPendingSlots()
// block until we can resync the node
if err := r.initialSync.Resync(); err != nil {
log.Errorf("Could not Resync Chain: %v", err)
log.Errorf("Could not resync chain: %v", err)
}
}
}