Add Ability to Resync Node (#4279)

* add resyncing functionality

* add more validation to status message

* lint and build

* jim's review

* preston's review

* clean up

* remove log

* remove no sync

* change again

* change back

* remove spaces

* Update shared/slotutil/slottime.go

Co-Authored-By: Raul Jordan <raul@prysmaticlabs.com>

* Apply suggestions from code review

Co-Authored-By: Raul Jordan <raul@prysmaticlabs.com>

* fix refs

* raul's review

* goimports

* goimports

* add counter

* removed condition

* change back

* gaz

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Nishant Das
2020-01-02 16:09:28 +08:00
committed by GitHub
parent bdc4045e23
commit 03356fc7b5
17 changed files with 232 additions and 63 deletions

View File

@@ -7,6 +7,7 @@ go_library(
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//proto/beacon/p2p/v1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/roughtime:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",

View File

@@ -28,6 +28,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/roughtime"
)
@@ -316,6 +317,46 @@ func (p *Status) Decay() {
}
}
// BestFinalized returns the highest finalized epoch that is agreed upon by the majority of
// peers. This method may not return the absolute highest finalized, but the finalized epoch in
// which most peers can serve blocks. Ideally, all peers would be reporting the same finalized
// epoch.
// Returns the best finalized root, epoch number, and peers that agree.
func (p *Status) BestFinalized(maxPeers int) ([]byte, uint64, []peer.ID) {
finalized := make(map[[32]byte]uint64)
rootToEpoch := make(map[[32]byte]uint64)
for _, pid := range p.Connected() {
peerChainState, err := p.ChainState(pid)
if err == nil && peerChainState != nil {
r := bytesutil.ToBytes32(peerChainState.FinalizedRoot)
finalized[r]++
rootToEpoch[r] = peerChainState.FinalizedEpoch
}
}
var mostVotedFinalizedRoot [32]byte
var mostVotes uint64
for root, count := range finalized {
if count > mostVotes {
mostVotes = count
mostVotedFinalizedRoot = root
}
}
var pids []peer.ID
for _, pid := range p.Connected() {
peerChainState, err := p.ChainState(pid)
if err == nil && peerChainState != nil && peerChainState.FinalizedEpoch >= rootToEpoch[mostVotedFinalizedRoot] {
pids = append(pids, pid)
if len(pids) >= maxPeers {
break
}
}
}
return mostVotedFinalizedRoot[:], rootToEpoch[mostVotedFinalizedRoot], pids
}
// fetch is a helper function that fetches a peer status, possibly creating it.
func (p *Status) fetch(pid peer.ID) *peerStatus {
if _, ok := p.status[pid]; !ok {

View File

@@ -1,6 +1,7 @@
package peers_test
import (
"bytes"
"crypto/rand"
"fmt"
"testing"
@@ -334,6 +335,77 @@ func TestDecay(t *testing.T) {
}
}
func TestBestPeer(t *testing.T) {
maxBadResponses := 2
expectedFinEpoch := uint64(4)
expectedRoot := [32]byte{'t', 'e', 's', 't'}
junkRoot := [32]byte{'j', 'u', 'n', 'k'}
p := peers.NewStatus(maxBadResponses)
// Peer 1
pid1 := addPeer(t, p, peers.PeerConnected)
p.SetChainState(pid1, &pb.Status{
FinalizedEpoch: expectedFinEpoch,
FinalizedRoot: expectedRoot[:],
})
// Peer 2
pid2 := addPeer(t, p, peers.PeerConnected)
p.SetChainState(pid2, &pb.Status{
FinalizedEpoch: expectedFinEpoch,
FinalizedRoot: expectedRoot[:],
})
// Peer 3
pid3 := addPeer(t, p, peers.PeerConnected)
p.SetChainState(pid3, &pb.Status{
FinalizedEpoch: 3,
FinalizedRoot: junkRoot[:],
})
// Peer 4
pid4 := addPeer(t, p, peers.PeerConnected)
p.SetChainState(pid4, &pb.Status{
FinalizedEpoch: expectedFinEpoch,
FinalizedRoot: expectedRoot[:],
})
// Peer 5
pid5 := addPeer(t, p, peers.PeerConnected)
p.SetChainState(pid5, &pb.Status{
FinalizedEpoch: expectedFinEpoch,
FinalizedRoot: expectedRoot[:],
})
// Peer 6
pid6 := addPeer(t, p, peers.PeerConnected)
p.SetChainState(pid6, &pb.Status{
FinalizedEpoch: 3,
FinalizedRoot: junkRoot[:],
})
retRoot, retEpoch, _ := p.BestFinalized(15)
if !bytes.Equal(retRoot, expectedRoot[:]) {
t.Errorf("Incorrect Finalized Root retrieved; wanted %v but got %v", expectedRoot, retRoot)
}
if retEpoch != expectedFinEpoch {
t.Errorf("Incorrect Finalized epoch retrieved; wanted %v but got %v", expectedFinEpoch, retEpoch)
}
}
func TestBestFinalized_returnsMaxValue(t *testing.T) {
maxBadResponses := 2
maxPeers := 10
p := peers.NewStatus(maxBadResponses)
for i := 0; i <= maxPeers+100; i++ {
p.Add(peer.ID(i), nil, network.DirOutbound)
p.SetConnectionState(peer.ID(i), peers.PeerConnected)
p.SetChainState(peer.ID(i), &pb.Status{
FinalizedEpoch: 10,
})
}
_, _, pids := p.BestFinalized(maxPeers)
if len(pids) != maxPeers {
t.Fatalf("returned wrong number of peers, wanted %d, got %d", maxPeers, len(pids))
}
}
// addPeer is a helper to add a peer with a given connection state)
func addPeer(t *testing.T, p *peers.Status, state peers.PeerConnectionState) peer.ID {
// Set up some peers with different states

View File

@@ -3,7 +3,7 @@ package p2p
import (
"encoding/base64"
"github.com/libp2p/go-libp2p-pubsub/pb"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prysmaticlabs/prysm/shared/hashutil"
)

View File

@@ -50,6 +50,7 @@ go_library(
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/runutil:go_default_library",
"//shared/slotutil:go_default_library",
"//shared/traceutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_core//:go_default_library",

View File

@@ -11,6 +11,7 @@ import (
const genericError = "internal service error"
var errWrongForkVersion = errors.New("wrong fork version")
var errInvalidEpoch = errors.New("invalid epoch")
var responseCodeSuccess = byte(0x00)
var responseCodeInvalidRequest = byte(0x01)

View File

@@ -25,6 +25,7 @@ go_library(
"//shared/mathutil:go_default_library",
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/slotutil:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_paulbellamy_ratecounter//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
@@ -50,8 +51,8 @@ go_test(
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/sliceutil:go_default_library",
"//shared/slotutil:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",

View File

@@ -19,11 +19,12 @@ import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/sirupsen/logrus"
)
const blockBatchSize = 64
const maxPeersToSync = 15
const counterSeconds = 20
const refreshTime = 6 * time.Second
@@ -43,11 +44,10 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
counter := ratecounter.NewRateCounter(counterSeconds * time.Second)
randGenerator := rand.New(rand.NewSource(time.Now().Unix()))
var lastEmptyRequests int
// Step 1 - Sync to end of finalized epoch.
for s.chain.HeadSlot() < helpers.StartSlot(s.highestFinalizedEpoch()+1) {
root, finalizedEpoch, peers := s.bestFinalized()
root, finalizedEpoch, peers := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
if len(peers) == 0 {
log.Warn("No peers; waiting for reconnect")
time.Sleep(refreshTime)
@@ -215,7 +215,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
log.Debug("Synced to finalized epoch - now syncing blocks up to current head")
if s.chain.HeadSlot() == slotsSinceGenesis(genesis) {
if s.chain.HeadSlot() == slotutil.SlotsSinceGenesis(genesis) {
return nil
}
@@ -226,19 +226,19 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
// we receive there after must build on the finalized chain or be considered invalid during
// fork choice resolution / block processing.
best := s.bestPeer()
root, _, _ := s.bestFinalized()
root, _, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
// if no best peer exists, retry until a new best peer is found.
for len(best) == 0 {
time.Sleep(refreshTime)
best = s.bestPeer()
root, _, _ = s.bestFinalized()
root, _, _ = s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
}
for head := slotsSinceGenesis(genesis); s.chain.HeadSlot() < head; {
for head := slotutil.SlotsSinceGenesis(genesis); s.chain.HeadSlot() < head; {
req := &p2ppb.BeaconBlocksByRangeRequest{
HeadBlockRoot: root,
StartSlot: s.chain.HeadSlot() + 1,
Count: mathutil.Min(slotsSinceGenesis(genesis)-s.chain.HeadSlot()+1, 256),
Count: mathutil.Min(slotutil.SlotsSinceGenesis(genesis)-s.chain.HeadSlot()+1, 256),
Step: 1,
}
@@ -298,7 +298,7 @@ func (s *Service) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocksByRa
// highestFinalizedEpoch as reported by peers. This is the absolute highest finalized epoch as
// reported by peers.
func (s *Service) highestFinalizedEpoch() uint64 {
_, epoch, _ := s.bestFinalized()
_, epoch, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
return epoch
}
@@ -333,7 +333,7 @@ func (s *Service) bestFinalized() ([]byte, uint64, []peer.ID) {
peerChainState, err := s.p2p.Peers().ChainState(pid)
if err == nil && peerChainState != nil && peerChainState.FinalizedEpoch >= rootToEpoch[mostVotedFinalizedRoot] {
pids = append(pids, pid)
if len(pids) >= maxPeersToSync {
if len(pids) >= params.BeaconConfig().MaxPeersToSync {
break
}
}
@@ -363,7 +363,7 @@ func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, syncing
if rate == 0 {
rate = 1
}
timeRemaining := time.Duration(float64(slotsSinceGenesis(genesis)-blk.Slot)/rate) * time.Second
timeRemaining := time.Duration(float64(slotutil.SlotsSinceGenesis(genesis)-blk.Slot)/rate) * time.Second
log.WithField(
"peers",
fmt.Sprintf("%d/%d", len(syncingPeers), len(s.p2p.Peers().Connected())),
@@ -373,7 +373,7 @@ func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, syncing
).Infof(
"Processing block %d/%d - estimated time remaining %s",
blk.Slot,
slotsSinceGenesis(genesis),
slotutil.SlotsSinceGenesis(genesis),
timeRemaining,
)
}

View File

@@ -8,21 +8,19 @@ import (
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/go-ssz"
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/sirupsen/logrus"
)
@@ -42,7 +40,7 @@ func init() {
}
func TestConstants(t *testing.T) {
if maxPeersToSync*blockBatchSize > 1000 {
if params.BeaconConfig().MaxPeersToSync*blockBatchSize > 1000 {
t.Fatal("rpc rejects requests over 1000 range slots")
}
}
@@ -372,8 +370,8 @@ func makeGenesisTime(currentSlot uint64) time.Time {
func TestMakeGenesisTime(t *testing.T) {
currentSlot := uint64(64)
gt := makeGenesisTime(currentSlot)
if slotsSinceGenesis(gt) != currentSlot {
t.Fatalf("Wanted %d, got %d", currentSlot, slotsSinceGenesis(gt))
if slotutil.SlotsSinceGenesis(gt) != currentSlot {
t.Fatalf("Wanted %d, got %d", currentSlot, slotutil.SlotsSinceGenesis(gt))
}
}
@@ -425,23 +423,3 @@ func TestMakeSequence(t *testing.T) {
t.Fatalf("Wanted %v, got %v", want, got)
}
}
func TestBestFinalized_returnsMaxValue(t *testing.T) {
p := p2pt.NewTestP2P(t)
s := &Service{
p2p: p,
}
for i := 0; i <= maxPeersToSync+100; i++ {
s.p2p.Peers().Add(peer.ID(i), nil, network.DirOutbound)
s.p2p.Peers().SetConnectionState(peer.ID(i), peers.PeerConnected)
s.p2p.Peers().SetChainState(peer.ID(i), &pb.Status{
FinalizedEpoch: 10,
})
}
_, _, pids := s.bestFinalized()
if len(pids) != maxPeersToSync {
t.Fatalf("returned wrong number of peers, wanted %d, got %d", maxPeersToSync, len(pids))
}
}

View File

@@ -2,10 +2,9 @@ package initialsync
import (
"context"
"errors"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
@@ -14,8 +13,9 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/sirupsen/logrus"
)
var _ = shared.Service(&Service{})
@@ -101,7 +101,7 @@ func (s *Service) Start() {
time.Sleep(roughtime.Until(genesis))
}
s.chainStarted = true
currentSlot := slotsSinceGenesis(genesis)
currentSlot := slotutil.SlotsSinceGenesis(genesis)
if helpers.SlotToEpoch(currentSlot) == 0 {
log.Info("Chain started within the last epoch - not syncing")
s.synced = true
@@ -114,20 +114,7 @@ func (s *Service) Start() {
s.synced = true
return
}
// Every 5 sec, report handshake count.
for {
count := len(s.p2p.Peers().Connected())
if count >= flags.Get().MinimumSyncPeers {
break
}
log.WithField(
"handshakes",
fmt.Sprintf("%d/%d", count, flags.Get().MinimumSyncPeers),
).Info("Waiting for enough peer handshakes before syncing")
time.Sleep(handshakePollingInterval)
}
s.waitForMinimumPeers()
if err := s.roundRobinSync(genesis); err != nil {
panic(err)
}
@@ -154,6 +141,37 @@ func (s *Service) Syncing() bool {
return !s.synced
}
func slotsSinceGenesis(genesisTime time.Time) uint64 {
return uint64(roughtime.Since(genesisTime).Seconds()) / params.BeaconConfig().SecondsPerSlot
// Resync allows a node to start syncing again if it has fallen
// behind the current network head.
func (s *Service) Resync() error {
// set it to false since we are syncing again
s.synced = false
headState, err := s.chain.HeadState(context.Background())
if err != nil {
return errors.Wrap(err, "could not retrieve head state")
}
genesis := time.Unix(int64(headState.GenesisTime), 0)
s.waitForMinimumPeers()
if err := s.roundRobinSync(genesis); err != nil {
return errors.Wrap(err, "could not retrieve head state")
}
log.Infof("Synced up to slot %d", s.chain.HeadSlot())
s.synced = true
return nil
}
func (s *Service) waitForMinimumPeers() {
// Every 5 sec, report handshake count.
for {
count := len(s.p2p.Peers().Connected())
if count >= flags.Get().MinimumSyncPeers {
break
}
log.WithFields(logrus.Fields{
"valid handshakes": count,
"required handshakes": flags.Get().MinimumSyncPeers}).Info("Waiting for enough peer handshakes before syncing")
time.Sleep(handshakePollingInterval)
}
}

View File

@@ -14,3 +14,8 @@ func (s *Sync) Syncing() bool {
func (s *Sync) Status() error {
return nil
}
// Resync --
func (s *Sync) Resync() error {
return nil
}

View File

@@ -34,4 +34,10 @@ var (
},
[]string{"topic"},
)
numberOfTimesResyncedCounter = promauto.NewCounter(
prometheus.CounterOpts{
Name: "number_of_times_resynced",
Help: "Count the number of times a node resyncs.",
},
)
)

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
@@ -146,3 +147,10 @@ func (r *Service) validatePendingSlots() error {
oldBlockRoots = nil
return nil
}
func (r *Service) clearPendingSlots() {
r.pendingQueueLock.Lock()
defer r.pendingQueueLock.Unlock()
r.slotToPendingBlocks = make(map[uint64]*ethpb.BeaconBlock)
r.seenPendingBlocks = make(map[[32]byte]bool)
}

View File

@@ -9,13 +9,15 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
)
const statusInterval = 6 * time.Minute // 60 slots.
const statusInterval = 6 * time.Minute // 30 slots.
// maintainPeerStatuses by infrequently polling peers for their latest status.
func (r *Service) maintainPeerStatuses() {
@@ -34,6 +36,17 @@ func (r *Service) maintainPeerStatuses() {
}
}
}
if !r.initialSync.Syncing() {
_, highestEpoch, _ := r.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
if helpers.StartSlot(highestEpoch) > r.chain.HeadSlot() {
numberOfTimesResyncedCounter.Inc()
r.clearPendingSlots()
// block until we can resync the node
if err := r.initialSync.Resync(); err != nil {
log.Errorf("Could not Resync Chain: %v", err)
}
}
}
})
}
@@ -134,5 +147,13 @@ func (r *Service) validateStatusMessage(msg *pb.Status, stream network.Stream) e
if !bytes.Equal(params.BeaconConfig().GenesisForkVersion, msg.HeadForkVersion) {
return errWrongForkVersion
}
genesis := r.chain.GenesisTime()
maxEpoch := slotutil.EpochsSinceGenesis(genesis)
// It would take a minimum of 2 epochs to finalize a
// previous epoch
maxFinalizedEpoch := maxEpoch - 2
if msg.FinalizedEpoch > maxFinalizedEpoch {
return errInvalidEpoch
}
return nil
}

View File

@@ -103,4 +103,5 @@ func (r *Service) Status() error {
type Checker interface {
Syncing() bool
Status() error
Resync() error
}

View File

@@ -92,6 +92,7 @@ type BeaconChainConfig struct {
EmptySignature [96]byte // EmptySignature is used to represent a zeroed out BLS Signature.
DefaultPageSize int // DefaultPageSize defines the default page size for RPC server request.
MaxPageSize int // MaxPageSize defines the max page size for RPC server respond.
MaxPeersToSync int // MaxPeersToSync describes the limit for number of peers in round robin sync.
// Slasher constants.
WeakSubjectivityPeriod uint64 // WeakSubjectivityPeriod defines the time period expressed in number of epochs were proof of stake network should validate block headers and attestations for slashable events.
@@ -186,6 +187,7 @@ var defaultBeaconConfig = &BeaconChainConfig{
EmptySignature: [96]byte{},
DefaultPageSize: 250,
MaxPageSize: 500,
MaxPeersToSync: 15,
// Slasher related values.
WeakSubjectivityPeriod: 54000,

View File

@@ -4,6 +4,7 @@ import (
"time"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
)
// SlotStartTime returns the start time in terms of its unix epoch
@@ -13,3 +14,15 @@ func SlotStartTime(genesis uint64, slot uint64) time.Time {
startTime := time.Unix(int64(genesis), 0).Add(duration)
return startTime
}
// SlotsSinceGenesis returns the number of slots since
// the provided genesis time.
func SlotsSinceGenesis(genesis time.Time) uint64 {
return uint64(roughtime.Since(genesis).Seconds()) / params.BeaconConfig().SecondsPerSlot
}
// EpochsSinceGenesis returns the number of slots since
// the provided genesis time.
func EpochsSinceGenesis(genesis time.Time) uint64 {
return SlotsSinceGenesis(genesis) / params.BeaconConfig().SlotsPerEpoch
}