Parent blocks fetching/processing (#3459)

* first version of the watchtower api

* Initial prototype of sync parent fetching/processing

* Another map to track seen block root

* Fixed fmt

* Ready to live test

* Ready to live test

* Seperate pending block queue into its own

* first version

* delete watchtower

* move to message loop

* roughtime

* one time

* fix test

* Started testing but peer list empty

* Comment

* Loggins

* Stuck at decoding non proto type

* Revert

* First take, need feedback

* Run time panics at hello

* Revert

* use reflect properly

* Fixed subscriber

* instantiate helper

* More reverts

* Revert back tests

* Cont when EOF

* Working

* Clean hello tracker on peer disconnection

* Clean hello tracker on peer disconnection

* Move to validation

* Propoer locking

* Propoer locking

* Fmt

* Nishant's feedbacke

* More feedback

* All tests passing

* fix build

* remove log

* gaz

* Added the todo
This commit is contained in:
terence tsao
2019-09-20 10:08:32 -07:00
committed by Preston Van Loon
parent a2aa142b90
commit cf2ad1f21c
10 changed files with 360 additions and 42 deletions

View File

@@ -8,6 +8,7 @@ go_library(
"error.go",
"log.go",
"metrics.go",
"pending_blocks_queue.go",
"rpc.go",
"rpc_beacon_blocks_by_range.go",
"rpc_beacon_blocks_by_root.go",
@@ -49,6 +50,7 @@ go_library(
"@com_github_libp2p_go_libp2p_core//:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
@@ -63,6 +65,7 @@ go_test(
size = "small",
srcs = [
"error_test.go",
"pending_blocks_queue_test.go",
"rpc_beacon_blocks_by_range_test.go",
"rpc_beacon_blocks_by_root_test.go",
"rpc_goodbye_test.go",

View File

@@ -0,0 +1,105 @@
package sync
import (
"context"
"encoding/hex"
"sort"
"time"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
)
var processPendingBlocksPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/3) * time.Second
// processes pending blocks queue on every processPendingBlocksPeriod
func (r *RegularSync) processPendingBlocksQueue() {
ticker := time.NewTicker(processPendingBlocksPeriod)
for {
ctx := context.TODO()
select {
case <-ticker.C:
r.processPendingBlocks(ctx)
case <-r.ctx.Done():
log.Debug("p2p context is closed, exiting routine")
break
}
}
}
// processes the block tree inside the queue
func (r *RegularSync) processPendingBlocks(ctx context.Context) error {
pids := r.peerIDs()
slots := r.sortedPendingSlots()
for _, s := range slots {
r.pendingQueueLock.RLock()
b := r.slotToPendingBlocks[uint64(s)]
inPendingQueue := r.seenPendingBlocks[bytesutil.ToBytes32(b.ParentRoot)]
r.pendingQueueLock.RUnlock()
inDB := r.db.HasBlock(ctx, bytesutil.ToBytes32(b.ParentRoot))
hasPeer := len(pids) != 0
// Only request for missing parent block if it's not in DB, not in pending cache
// and has peer in the peer list.
if !inPendingQueue && !inDB && hasPeer {
log.WithFields(logrus.Fields{
"currentSlot": b.Slot,
"parentRoot": hex.EncodeToString(b.ParentRoot),
}).Info("Requesting parent block")
req := [][32]byte{bytesutil.ToBytes32(b.ParentRoot)}
// TODO(3450): Use round robin sync API to rotate peers for sending recent block request
if err := r.sendRecentBeaconBlocksRequest(ctx, req, pids[0]); err != nil {
log.Errorf("Could not send recent block request: %v", err)
}
continue
}
if !inDB {
continue
}
if err := r.chain.ReceiveBlockNoPubsub(ctx, b); err != nil {
log.Errorf("Could not process block from slot %d: %v", b.Slot, err)
}
r.pendingQueueLock.Lock()
delete(r.slotToPendingBlocks, uint64(s))
blkRoot, err := ssz.SigningRoot(b)
if err != nil {
return err
}
delete(r.seenPendingBlocks, blkRoot)
r.pendingQueueLock.Unlock()
log.Infof("Processed ancestor block %d and cleared pending block cache", s)
}
return nil
}
func (r *RegularSync) peerIDs() []peer.ID {
hellos := r.PeerStatuses()
pids := make([]peer.ID, 0, len(hellos))
for pid := range hellos {
pids = append(pids, pid)
}
return pids
}
func (r *RegularSync) sortedPendingSlots() []int {
r.pendingQueueLock.RLock()
defer r.pendingQueueLock.RUnlock()
slots := make([]int, 0, len(r.slotToPendingBlocks))
for s := range r.slotToPendingBlocks {
slots = append(slots, int(s))
}
sort.Ints(slots)
return slots
}

View File

@@ -0,0 +1,195 @@
package sync
import (
"context"
"sync"
"testing"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
protocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/prysmaticlabs/go-ssz"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/sirupsen/logrus"
)
func init() {
logrus.SetLevel(logrus.DebugLevel)
}
// /- b1 - b2
// b0
// \- b3
// Test b1 was missing then received and we can process b0 -> b1 -> b2
func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks1(t *testing.T) {
db := dbtest.SetupDB(t)
defer dbtest.TeardownDB(t, db)
r := &RegularSync{
db: db,
chain: &mock.ChainService{},
slotToPendingBlocks: make(map[uint64]*ethpb.BeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
}
b0 := &ethpb.BeaconBlock{}
if err := r.db.SaveBlock(context.Background(), b0); err != nil {
t.Fatal(err)
}
b0Root, _ := ssz.SigningRoot(b0)
b3 := &ethpb.BeaconBlock{Slot: 3, ParentRoot: b0Root[:]}
if err := r.db.SaveBlock(context.Background(), b3); err != nil {
t.Fatal(err)
}
// Incomplete block link
b1 := &ethpb.BeaconBlock{Slot: 1, ParentRoot: b0Root[:]}
b1Root, _ := ssz.SigningRoot(b1)
b2 := &ethpb.BeaconBlock{Slot: 2, ParentRoot: b1Root[:]}
b2Root, _ := ssz.SigningRoot(b1)
// Add b2 to the cache
r.slotToPendingBlocks[b2.Slot] = b2
r.seenPendingBlocks[b2Root] = true
if err := r.processPendingBlocks(context.Background()); err != nil {
t.Fatal(err)
}
if len(r.slotToPendingBlocks) != 1 {
t.Errorf("Incorrect size for slot to pending blocks cache: got %d", len(r.slotToPendingBlocks))
}
if len(r.seenPendingBlocks) != 1 {
t.Errorf("Incorrect size for seen pending block: got %d", len(r.seenPendingBlocks))
}
// Add b1 to the cache
r.slotToPendingBlocks[b1.Slot] = b1
r.seenPendingBlocks[b1Root] = true
if err := r.db.SaveBlock(context.Background(), b1); err != nil {
t.Fatal(err)
}
if err := r.processPendingBlocks(context.Background()); err != nil {
t.Fatal(err)
}
if len(r.slotToPendingBlocks) != 0 {
t.Errorf("Incorrect size for slot to pending blocks cache: got %d", len(r.slotToPendingBlocks))
}
if len(r.seenPendingBlocks) != 0 {
t.Errorf("Incorrect size for seen pending block: got %d", len(r.seenPendingBlocks))
}
}
// /- b1 - b2 - b5
// b0
// \- b3 - b4
// Test b2 and b3 were missed, after receiving them we can process 2 chains.
func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks2(t *testing.T) {
db := dbtest.SetupDB(t)
defer dbtest.TeardownDB(t, db)
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
if len(p1.Host.Network().Peers()) != 1 {
t.Error("Expected peers to be connected")
}
pcl := protocol.ID("/eth2/beacon_chain/req/hello/1/ssz")
var wg sync.WaitGroup
wg.Add(1)
p2.Host.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
code, errMsg, err := ReadStatusCode(stream, p1.Encoding())
if err != nil {
t.Fatal(err)
}
if code == 0 {
t.Error("Expected a non-zero code")
}
if errMsg != errWrongForkVersion.Error() {
t.Logf("Received error string len %d, wanted error string len %d", len(errMsg), len(errWrongForkVersion.Error()))
t.Errorf("Received unexpected message response in the stream: %s. Wanted %s.", errMsg, errWrongForkVersion.Error())
}
})
r := &RegularSync{
p2p: p1,
db: db,
chain: &mock.ChainService{},
slotToPendingBlocks: make(map[uint64]*ethpb.BeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
statusTracker: make(map[peer.ID]*pb.Status),
}
r.statusTracker[peer.ID(1)] = &pb.Status{}
b0 := &ethpb.BeaconBlock{}
if err := r.db.SaveBlock(context.Background(), b0); err != nil {
t.Fatal(err)
}
b0Root, _ := ssz.SigningRoot(b0)
b1 := &ethpb.BeaconBlock{Slot: 1, ParentRoot: b0Root[:]}
if err := r.db.SaveBlock(context.Background(), b1); err != nil {
t.Fatal(err)
}
b1Root, _ := ssz.SigningRoot(b1)
// Incomplete block links
b2 := &ethpb.BeaconBlock{Slot: 2, ParentRoot: b1Root[:]}
b2Root, _ := ssz.SigningRoot(b2)
b5 := &ethpb.BeaconBlock{Slot: 5, ParentRoot: b2Root[:]}
b5Root, _ := ssz.SigningRoot(b5)
b3 := &ethpb.BeaconBlock{Slot: 3, ParentRoot: b0Root[:]}
b3Root, _ := ssz.SigningRoot(b3)
b4 := &ethpb.BeaconBlock{Slot: 4, ParentRoot: b3Root[:]}
b4Root, _ := ssz.SigningRoot(b4)
r.slotToPendingBlocks[b4.Slot] = b4
r.seenPendingBlocks[b4Root] = true
r.slotToPendingBlocks[b5.Slot] = b5
r.seenPendingBlocks[b5Root] = true
if err := r.processPendingBlocks(context.Background()); err != nil {
t.Fatal(err)
}
if len(r.slotToPendingBlocks) != 2 {
t.Errorf("Incorrect size for slot to pending blocks cache: got %d", len(r.slotToPendingBlocks))
}
if len(r.seenPendingBlocks) != 2 {
t.Errorf("Incorrect size for seen pending block: got %d", len(r.seenPendingBlocks))
}
// Add b3 to the cache
r.slotToPendingBlocks[b3.Slot] = b3
r.seenPendingBlocks[b3Root] = true
if err := r.db.SaveBlock(context.Background(), b3); err != nil {
t.Fatal(err)
}
if err := r.processPendingBlocks(context.Background()); err != nil {
t.Fatal(err)
}
if len(r.slotToPendingBlocks) != 1 {
t.Errorf("Incorrect size for slot to pending blocks cache: got %d", len(r.slotToPendingBlocks))
}
if len(r.seenPendingBlocks) != 1 {
t.Errorf("Incorrect size for seen pending block: got %d", len(r.seenPendingBlocks))
}
// Add b2 to the cache
r.slotToPendingBlocks[b2.Slot] = b2
r.seenPendingBlocks[b2Root] = true
if err := r.db.SaveBlock(context.Background(), b2); err != nil {
t.Fatal(err)
}
if err := r.processPendingBlocks(context.Background()); err != nil {
t.Fatal(err)
}
if len(r.slotToPendingBlocks) != 0 {
t.Errorf("Incorrect size for slot to pending blocks cache: got %d", len(r.slotToPendingBlocks))
}
t.Log(r.seenPendingBlocks)
if len(r.seenPendingBlocks) != 0 {
t.Errorf("Incorrect size for seen pending block: got %d", len(r.seenPendingBlocks))
}
}

View File

@@ -86,5 +86,6 @@ func (r *RegularSync) registerRPC(topic string, base interface{}, handle rpcHand
log.WithError(err).Error("Failed to handle p2p RPC")
}
}
})
}

View File

@@ -7,14 +7,13 @@ import (
libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-ssz"
eth "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
)
// sendRecentBeaconBlocksRequest sends a recent beacon blocks request to a peer to get
// those corresponding blocks from that peer.
func (r *RegularSync) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots [][32]byte, id peer.ID) error {
log := log.WithField("rpc", "beacon_blocks_by_root")
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
@@ -36,11 +35,16 @@ func (r *RegularSync) sendRecentBeaconBlocksRequest(ctx context.Context, blockRo
if err := r.p2p.Encoding().DecodeWithLength(stream, &resp); err != nil {
return err
}
r.pendingQueueLock.Lock()
defer r.pendingQueueLock.Unlock()
for _, blk := range resp {
if err := r.chain.ReceiveBlock(ctx, blk); err != nil {
log.WithError(err).Error("Unable to process block")
return nil
r.slotToPendingBlocks[blk.Slot] = blk
blkRoot, err := ssz.SigningRoot(blk)
if err != nil {
return err
}
r.seenPendingBlocks[blkRoot] = true
}
return nil

View File

@@ -119,8 +119,10 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) {
FinalizedCheckPoint: finalizedCheckpt,
Root: blockARoot[:],
},
statusTracker: make(map[peer.ID]*pb.Status),
ctx: context.Background(),
statusTracker: make(map[peer.ID]*pb.Status),
slotToPendingBlocks: make(map[uint64]*ethpb.BeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
ctx: context.Background(),
}
// Setup streams

View File

@@ -10,6 +10,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared"
)
@@ -35,12 +36,14 @@ type blockchainService interface {
// NewRegularSync service.
func NewRegularSync(cfg *Config) *RegularSync {
r := &RegularSync{
ctx: context.Background(),
db: cfg.DB,
p2p: cfg.P2P,
operations: cfg.Operations,
chain: cfg.Chain,
statusTracker: make(map[peer.ID]*pb.Status),
ctx: context.Background(),
db: cfg.DB,
p2p: cfg.P2P,
operations: cfg.Operations,
chain: cfg.Chain,
statusTracker: make(map[peer.ID]*pb.Status),
slotToPendingBlocks: make(map[uint64]*ethpb.BeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
}
r.registerRPCHandlers()
@@ -52,20 +55,24 @@ func NewRegularSync(cfg *Config) *RegularSync {
// RegularSync service is responsible for handling all run time p2p related operations as the
// main entry point for network messages.
type RegularSync struct {
ctx context.Context
p2p p2p.P2P
db db.Database
operations *operations.Service
chain blockchainService
statusTracker map[peer.ID]*pb.Status
statusTrackerLock sync.RWMutex
chainStarted bool
ctx context.Context
p2p p2p.P2P
db db.Database
operations *operations.Service
chain blockchainService
statusTracker map[peer.ID]*pb.Status
statusTrackerLock sync.RWMutex
slotToPendingBlocks map[uint64]*ethpb.BeaconBlock
seenPendingBlocks map[[32]byte]bool
pendingQueueLock sync.RWMutex
chainStarted bool
}
// Start the regular sync service.
func (r *RegularSync) Start() {
r.p2p.AddConnectionHandler(r.sendRPCStatusRequest)
r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus)
go r.processPendingBlocksQueue()
}
// Stop the regular sync service.
@@ -91,6 +98,13 @@ func (r *RegularSync) PeerStatuses() map[peer.ID]*pb.Status {
return r.statusTracker
}
// ClearPendingBlocks clears outstanding pending blocks waiting to be processed,
// this should be called during new finalization.
func (r *RegularSync) ClearPendingBlocks() {
r.slotToPendingBlocks = make(map[uint64]*ethpb.BeaconBlock)
r.seenPendingBlocks = make(map[[32]byte]bool)
}
// Checker defines a struct which can verify whether a node is currently
// synchronizing a chain with the rest of peers in the network.
type Checker interface {

View File

@@ -38,9 +38,9 @@ func (r *RegularSync) registerSubscribers() {
ch := make(chan time.Time)
sub := r.chain.StateInitializedFeed().Subscribe(ch)
defer sub.Unsubscribe()
// Wait until chain start.
genesis := <-ch
if genesis.After(roughtime.Now()) {
time.Sleep(roughtime.Until(genesis))
}

View File

@@ -2,27 +2,18 @@ package sync
import (
"context"
"encoding/base64"
"time"
"github.com/gogo/protobuf/proto"
"github.com/karlseguin/ccache"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
)
// pendingBlocks cache.
// TODO(3147): This cache value should represent a doubly linked list or some management to
// process pending blocks after a link to the canonical chain is found.
var pendingBlocks = ccache.New(ccache.Configure())
func (r *RegularSync) beaconBlockSubscriber(ctx context.Context, msg proto.Message) error {
block := msg.(*ethpb.BeaconBlock)
headState := r.chain.HeadState()
// Ignore block older than last finalized checkpoint.
if block.Slot < helpers.StartSlot(headState.FinalizedCheckpoint.Epoch) {
log.Debugf("Received a block that's older than finalized checkpoint, %d < %d",
@@ -30,18 +21,18 @@ func (r *RegularSync) beaconBlockSubscriber(ctx context.Context, msg proto.Messa
return nil
}
// Handle block when the parent is unknown.
if !r.db.HasBlock(ctx, bytesutil.ToBytes32(block.ParentRoot)) {
blockRoot, err := ssz.SigningRoot(block)
if err != nil {
return err
}
b64BlockRoot := base64.StdEncoding.EncodeToString(blockRoot[:])
pendingBlocks.Set(b64BlockRoot, block, 2*time.Hour)
blockRoot, err := ssz.SigningRoot(block)
if err != nil {
log.Errorf("Could not sign root block: %v", err)
return nil
}
// TODO(3147): Request parent block from peers
log.Warnf("Received a block from slot %d which we do not have the parent block in the database. "+
"Requesting missing blocks from peers is not yet implemented.", block.Slot)
// Handle block when the parent is unknown
if !r.db.HasBlock(ctx, bytesutil.ToBytes32(block.ParentRoot)) {
r.pendingQueueLock.Lock()
r.slotToPendingBlocks[block.Slot] = block
r.seenPendingBlocks[blockRoot] = true
r.pendingQueueLock.Unlock()
return nil
}

View File

@@ -30,6 +30,9 @@ func (r *RegularSync) validateBeaconBlockPubSub(ctx context.Context, msg proto.M
// TODO(1332): Add blocks.VerifyAttestation before processing further.
// Discussion: https://github.com/ethereum/eth2.0-specs/issues/1332
if r.seenPendingBlocks[blockRoot] {
return false
}
if recentlySeenRoots.Get(string(blockRoot[:])) != nil || r.db.HasBlock(ctx, blockRoot) {
return false
}