First pass: single peer initial sync (#3363)

* lint

* add requests

* add all new stuff

* comment

* preston's review

* initial commit

* reorder sync so it isn't required to wait until start

* checkpoint

* fix

* improved handler API

* Set up prechain start values

* improved handler API

* ooops

* successful peer handshakes

* successful peer handshakes

* successful peer handshakes

* checkpoint

* chpkt

* handle init after chain start

* emit state initialized feed if existing db state

* merge error

* Done

* Test

* Fixed test

* emit state initialized

* force fork choice update

* wait for genesis time

* sync to current slot

* Use saved head in DB

* gaz

* fix tests

* lint

* lint

* lint

* lint

* Revert "Use saved head in DB"

This reverts commit c5f3404fdf.

* remove db

* lint

* remove unused interfaces from composite

* resolve comments
This commit is contained in:
Preston Van Loon
2019-08-30 13:15:40 -07:00
committed by Raul Jordan
parent 205fe1baa5
commit 95c528f0bc
17 changed files with 316 additions and 46 deletions

View File

@@ -23,6 +23,7 @@ go_library(
"//beacon-chain/powchain:go_default_library",
"//beacon-chain/rpc:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/sync/initial-sync:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared:go_default_library",

View File

@@ -28,6 +28,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
"github.com/prysmaticlabs/prysm/beacon-chain/rpc"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
initialsync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/cmd"
"github.com/prysmaticlabs/prysm/shared/debug"
@@ -114,6 +115,10 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
return nil, err
}
if err := beacon.registerInitialSyncService(ctx); err != nil {
return nil, err
}
if err := beacon.registerRPCService(ctx); err != nil {
return nil, err
}
@@ -415,6 +420,34 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error {
return b.services.RegisterService(syncService)
}
func (b *BeaconNode) registerInitialSyncService(ctx *cli.Context) error {
var attsService *attestation.Service
if err := b.services.FetchService(&attsService); err != nil {
return err
}
if featureconfig.FeatureConfig().UseNewSync {
var chainService *blockchain.ChainService
if err := b.services.FetchService(&chainService); err != nil {
return err
}
var regSync *prysmsync.RegularSync
if err := b.services.FetchService(&regSync); err != nil {
return err
}
is := initialsync.NewInitialSync(&initialsync.Config{
Chain: chainService,
RegSync: regSync,
P2P: b.fetchP2P(ctx),
})
return b.services.RegisterService(is)
}
return nil
}
func (b *BeaconNode) registerRPCService(ctx *cli.Context) error {
var chainService interface{}
if featureconfig.FeatureConfig().UseNewBlockChainService {

View File

@@ -60,6 +60,7 @@ go_test(
"service_test.go",
],
embed = [":go_default_library"],
tags = ["block-network"],
deps = [
"//beacon-chain/p2p/testing:go_default_library",
"//proto/testing:go_default_library",

View File

@@ -139,8 +139,8 @@ func TestStaticPeering_PeersAreAdded(t *testing.T) {
}
}()
cfg.Port = 4000
cfg.UDPPort = 4000
cfg.Port = 14000
cfg.UDPPort = 14000
cfg.StaticPeers = staticPeers
s, err := NewService(cfg)

View File

@@ -40,8 +40,9 @@ func (p *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
log.WithError(err).Error("Could not send successful hello rpc request")
if err := p.Disconnect(conn.RemotePeer()); err != nil {
log.WithError(err).Errorf("Unable to close peer %s", conn.RemotePeer())
return
}
return
log.WithField("peer", conn.RemotePeer().Pretty()).Info("New peer connected.")
}
}()
},

View File

@@ -22,8 +22,6 @@ type P2P interface {
Sender
ConnectionHandler
DeprecatedSubscriber
Started() bool
}
// Broadcaster broadcasts messages to peers over the p2p pubsub protocol.

View File

@@ -40,35 +40,26 @@ type Service struct {
// connections are made until the Start function is called during the service registry startup.
func NewService(cfg *Config) (*Service, error) {
ctx, cancel := context.WithCancel(context.Background())
return &Service{
s := &Service{
ctx: ctx,
cancel: cancel,
cfg: cfg,
}, nil
}
// Start the p2p service.
func (s *Service) Start() {
if s.started {
log.Error("Attempted to start p2p service when it was already started")
return
}
ipAddr := ipAddr(s.cfg)
privKey, err := privKey(s.cfg)
if err != nil {
s.startupErr = err
log.WithError(err).Error("Failed to generate p2p private key")
return
return nil, err
}
// TODO(3147): Add host options
opts := buildOptions(s.cfg, ipAddr, privKey)
h, err := libp2p.New(s.ctx, opts...)
if err != nil {
s.startupErr = err
log.WithError(err).Error("Failed to create p2p host")
return
return nil, err
}
s.host = h
@@ -79,14 +70,30 @@ func (s *Service) Start() {
// object.
gs, err := pubsub.NewGossipSub(s.ctx, s.host)
if err != nil {
s.startupErr = err
log.WithError(err).Error("Failed to start pubsub")
return
return nil, err
}
s.pubsub = gs
return s, nil
}
// Start the p2p service.
func (s *Service) Start() {
if s.started {
log.Error("Attempted to start p2p service when it was already started")
return
}
if s.cfg.BootstrapNodeAddr != "" && !s.cfg.NoDiscovery {
ipAddr := ipAddr(s.cfg)
privKey, err := privKey(s.cfg)
if err != nil {
s.startupErr = err
log.WithError(err).Error("Failed to generate p2p private key")
return
}
listener, err := startDiscoveryV5(ipAddr, privKey, s.cfg)
if err != nil {
log.WithError(err).Error("Failed to start discovery")

View File

@@ -77,7 +77,7 @@ func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) {
}
func TestService_Stop_SetsStartedToFalse(t *testing.T) {
s, _ := NewService(nil)
s, _ := NewService(&Config{})
s.started = true
s.dv5Listener = &mockListener{}
_ = s.Stop()
@@ -88,7 +88,7 @@ func TestService_Stop_SetsStartedToFalse(t *testing.T) {
}
func TestService_Stop_DontPanicIfDv5ListenerIsNotInited(t *testing.T) {
s, _ := NewService(nil)
s, _ := NewService(&Config{})
_ = s.Stop()
}
@@ -149,8 +149,8 @@ func TestListenForNewNodes(t *testing.T) {
}
}()
cfg.Port = 4000
cfg.UDPPort = 4000
cfg.Port = 14000
cfg.UDPPort = 14000
s, err := NewService(cfg)
if err != nil {

View File

@@ -35,6 +35,7 @@ go_library(
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/operations:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared:go_default_library",
@@ -81,6 +82,7 @@ go_test(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",

View File

@@ -5,6 +5,7 @@ import (
"errors"
"io"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
@@ -25,7 +26,8 @@ func (r *RegularSync) generateErrorResponse(code byte, reason string) ([]byte, e
return buf.Bytes(), nil
}
func (r *RegularSync) readStatusCode(stream io.Reader) (uint8, *pb.ErrorMessage, error) {
// ReadStatusCode response from a RPC stream.
func ReadStatusCode(stream io.Reader, encoding encoder.NetworkEncoding) (uint8, *pb.ErrorMessage, error) {
b := make([]byte, 1)
_, err := stream.Read(b)
if err != nil {
@@ -37,7 +39,7 @@ func (r *RegularSync) readStatusCode(stream io.Reader) (uint8, *pb.ErrorMessage,
}
msg := &pb.ErrorMessage{}
if err := r.p2p.Encoding().Decode(stream, msg); err != nil {
if err := encoding.Decode(stream, msg); err != nil {
return 0, nil, err
}

View File

@@ -0,0 +1,25 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"log.go",
"service.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/sync:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared:go_default_library",
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -0,0 +1,7 @@
package initialsync
import (
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("prefix", "initial-sync")

View File

@@ -0,0 +1,178 @@
package initialsync
import (
"context"
"fmt"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
eth "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
)
var _ = shared.Service(&InitialSync{})
type blockchainService interface {
blockchain.BlockReceiver
blockchain.HeadRetriever
blockchain.ChainFeeds
}
const (
minHelloCount = 1 // TODO(3147): Set this to more than 1, maybe configure from flag?
handshakePollingInterval = 5 * time.Second // Polling interval for checking the number of received handshakes.
)
// Config to set up the initial sync service.
type Config struct {
P2P p2p.P2P
DB db.Database
Chain blockchainService
RegSync sync.HelloTracker
}
// InitialSync service.
type InitialSync struct {
helloTracker sync.HelloTracker
chain blockchainService
p2p p2p.P2P
}
// NewInitialSync configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain.
func NewInitialSync(cfg *Config) *InitialSync {
return &InitialSync{
helloTracker: cfg.RegSync,
chain: cfg.Chain,
p2p: cfg.P2P,
}
}
// Start the initial sync service.
func (s *InitialSync) Start() {
ch := make(chan time.Time)
sub := s.chain.StateInitializedFeed().Subscribe(ch)
defer sub.Unsubscribe()
// Wait until chain start.
genesis := <-ch
if genesis.After(roughtime.Now()) {
time.Sleep(roughtime.Until(genesis))
}
currentSlot := slotsSinceGenesis(genesis)
if helpers.SlotToEpoch(currentSlot) == 0 {
log.Info("Chain started within the last epoch. Not syncing.")
return
}
// Are we already in sync, or close to it?
if helpers.SlotToEpoch(s.chain.HeadSlot()) == helpers.SlotToEpoch(currentSlot) {
log.Info("Already synced to the current epoch.")
return
}
// Every 5 sec, report handshake count.
for {
helloCount := len(s.helloTracker.Hellos())
log.WithField(
"hellos",
fmt.Sprintf("%d/%d", helloCount, minHelloCount),
).Info("Waiting for enough peer handshakes before syncing.")
if helloCount >= minHelloCount {
break
}
time.Sleep(handshakePollingInterval)
}
pid, best := bestHello(s.helloTracker.Hellos())
var last *eth.BeaconBlock
for headSlot := s.chain.HeadSlot(); headSlot < slotsSinceGenesis(genesis); {
req := &pb.BeaconBlocksRequest{
HeadSlot: headSlot,
HeadBlockRoot: s.chain.HeadRoot(),
Count: 64,
Step: 1,
}
log.WithField("data", fmt.Sprintf("%+v", req)).Info("Sending msg")
strm, err := s.p2p.Send(context.Background(), req, pid)
if err != nil {
panic(err)
}
// Read status code.
code, errMsg, err := sync.ReadStatusCode(strm, s.p2p.Encoding())
if err != nil {
panic(err)
}
if code != 0 {
log.Errorf("Request failed. Request was %+v", req)
panic(errMsg.ErrorMessage)
}
resp := &pb.BeaconBlocksResponse{}
if err := s.p2p.Encoding().Decode(strm, resp); err != nil {
panic(err)
}
log.Infof("Received %d blocks", len(resp.Blocks))
for _, blk := range resp.Blocks {
if blk.Slot <= headSlot {
continue
}
if blk.Slot < helpers.StartSlot(best.FinalizedEpoch+1) {
if err := s.chain.ReceiveBlockNoPubsubForkchoice(context.Background(), blk); err != nil {
panic(err)
}
} else {
if err := s.chain.ReceiveBlockNoPubsub(context.Background(), blk); err != nil {
panic(err)
}
}
last = blk
}
headSlot = s.chain.HeadSlot()
}
// Force a fork choice update since fork choice was not run during initial sync.
if err := s.chain.ReceiveBlockNoPubsub(context.Background(), last); err != nil {
panic(err)
}
log.Infof("Synced up to %d", s.chain.HeadSlot())
}
// Stop initial sync.
func (s *InitialSync) Stop() error {
return nil
}
// Status of initial sync.
func (s *InitialSync) Status() error {
return nil
}
func bestHello(data map[peer.ID]*pb.Hello) (peer.ID, *pb.Hello) {
for pid, hello := range data {
return pid, hello
}
return "", nil
}
func slotsSinceGenesis(genesisTime time.Time) uint64 {
return uint64(roughtime.Since(genesisTime).Seconds()) / params.BeaconConfig().SecondsPerSlot
}

View File

@@ -40,7 +40,7 @@ func (r *RegularSync) sendRPCHelloRequest(ctx context.Context, id peer.ID) error
return err
}
code, errMsg, err := r.readStatusCode(stream)
code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding())
if err != nil {
return err
}
@@ -53,9 +53,9 @@ func (r *RegularSync) sendRPCHelloRequest(ctx context.Context, id peer.ID) error
if err := r.p2p.Encoding().Decode(stream, msg); err != nil {
return err
}
r.helloTrackerLock.RLock()
r.helloTrackerLock.Lock()
r.helloTracker[stream.Conn().RemotePeer()] = msg
r.helloTrackerLock.RUnlock()
r.helloTrackerLock.Unlock()
return r.validateHelloMessage(msg, stream)
}
@@ -70,7 +70,9 @@ func (r *RegularSync) helloRPCHandler(ctx context.Context, msg proto.Message, st
log := log.WithField("rpc", "hello")
// return if hello already exists
r.helloTrackerLock.RLock()
hello := r.helloTracker[stream.Conn().RemotePeer()]
r.helloTrackerLock.RUnlock()
if hello != nil {
log.Debugf("Peer %s already exists", stream.Conn().RemotePeer())
return nil
@@ -78,9 +80,9 @@ func (r *RegularSync) helloRPCHandler(ctx context.Context, msg proto.Message, st
m := msg.(*pb.Hello)
r.helloTrackerLock.RLock()
r.helloTrackerLock.Lock()
r.helloTracker[stream.Conn().RemotePeer()] = m
r.helloTrackerLock.RUnlock()
r.helloTrackerLock.Unlock()
if err := r.validateHelloMessage(m, stream); err != nil {
originalErr := err
@@ -102,6 +104,10 @@ func (r *RegularSync) helloRPCHandler(ctx context.Context, msg proto.Message, st
return originalErr
}
r.helloTrackerLock.Lock()
r.helloTracker[stream.Conn().RemotePeer()] = m
r.helloTrackerLock.Unlock()
r.p2p.AddHandshake(stream.Conn().RemotePeer(), m)
resp := &pb.Hello{

View File

@@ -35,7 +35,7 @@ func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
wg.Add(1)
p2.Host.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
code, errMsg, err := r.readStatusCode(stream)
code, errMsg, err := ReadStatusCode(stream, p1.Encoding())
if err != nil {
t.Fatal(err)
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/gogo/protobuf/proto"
libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/network"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
pb "github.com/prysmaticlabs/prysm/proto/testing"
"github.com/prysmaticlabs/prysm/shared/testutil"
@@ -17,7 +18,7 @@ import (
// expectSuccess status code from a stream in regular sync.
func expectSuccess(t *testing.T, r *RegularSync, stream network.Stream) {
code, errMsg, err := r.readStatusCode(stream)
code, errMsg, err := ReadStatusCode(stream, &encoder.SszNetworkEncoder{})
if err != nil {
t.Fatal(err)
}
@@ -32,7 +33,7 @@ func expectSuccess(t *testing.T, r *RegularSync, stream network.Stream) {
// expectResetStream status code from a stream in regular sync.
func expectResetStream(t *testing.T, r *RegularSync, stream network.Stream) {
expectedErr := "stream reset"
_, _, err := r.readStatusCode(stream)
_, _, err := ReadStatusCode(stream, &encoder.SszNetworkEncoder{})
if err.Error() != expectedErr {
t.Fatalf("Wanted this error %s but got %v instead", expectedErr, err)
}

View File

@@ -3,7 +3,6 @@ package sync
import (
"context"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
@@ -34,7 +33,7 @@ type blockchainService interface {
// NewRegularSync service.
func NewRegularSync(cfg *Config) *RegularSync {
return &RegularSync{
r := &RegularSync{
ctx: context.Background(),
db: cfg.DB,
p2p: cfg.P2P,
@@ -42,6 +41,11 @@ func NewRegularSync(cfg *Config) *RegularSync {
chain: cfg.Chain,
helloTracker: make(map[peer.ID]*pb.Hello),
}
r.registerRPCHandlers()
r.registerSubscribers()
return r
}
// RegularSync service is responsible for handling all run time p2p related operations as the
@@ -56,17 +60,9 @@ type RegularSync struct {
helloTrackerLock sync.RWMutex
}
// Start the regular sync service by initializing all of the p2p sync handlers.
// Start the regular sync service.
func (r *RegularSync) Start() {
log.Info("Starting regular sync")
for !r.p2p.Started() {
time.Sleep(200 * time.Millisecond)
}
// Add connection handler for new peers
r.p2p.AddConnectionHandler(r.sendRPCHelloRequest)
r.registerRPCHandlers()
r.registerSubscribers()
log.Info("Regular sync started")
}
// Stop the regular sync service.
@@ -85,9 +81,21 @@ func (r *RegularSync) Syncing() bool {
return false
}
// Hellos returns the map of hello messages received so far.
func (r *RegularSync) Hellos() map[peer.ID]*pb.Hello {
r.helloTrackerLock.RLock()
defer r.helloTrackerLock.RUnlock()
return r.helloTracker
}
// Checker defines a struct which can verify whether a node is currently
// synchronizing a chain with the rest of peers in the network.
type Checker interface {
Syncing() bool
Status() error
}
// HelloTracker interface for accessing the hello / handshake messages received so far.
type HelloTracker interface {
Hellos() map[peer.ID]*pb.Hello
}