Init sync pre queue (#5098)

* fixes data race, by isolating critical sections

* minor refactoring: resolves blocking calls

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Victor Farazdagi
2020-03-14 21:21:07 +03:00
committed by GitHub
parent f17818b1c0
commit 1137403e4b
3 changed files with 96 additions and 90 deletions

View File

@@ -74,7 +74,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
rateLimiter := leakybucket.NewCollector(
allowedBlocksPerSecond, /* rate */
allowedBlocksPerSecond, /* capacity */
false /* deleteEmptyBuckets */)
false /* deleteEmptyBuckets */)
return &blocksFetcher{
ctx: ctx,
@@ -83,7 +83,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
p2p: cfg.p2p,
rateLimiter: rateLimiter,
fetchRequests: make(chan *fetchRequestParams, fetchRequestsBuffer),
fetchResponses: make(chan *fetchRequestResponse),
fetchResponses: make(chan *fetchRequestResponse, fetchRequestsBuffer),
quit: make(chan struct{}),
}
}
@@ -130,8 +130,10 @@ func (f *blocksFetcher) loop() {
wg.Add(1)
go func() {
defer wg.Done()
f.handleRequest(req.ctx, req.start, req.count)
select {
case <-f.ctx.Done():
case f.fetchResponses <- f.handleRequest(req.ctx, req.start, req.count):
}
}()
}
}
@@ -139,82 +141,65 @@ func (f *blocksFetcher) loop() {
// scheduleRequest adds request to incoming queue.
func (f *blocksFetcher) scheduleRequest(ctx context.Context, start, count uint64) error {
if ctx.Err() != nil {
return ctx.Err()
}
request := &fetchRequestParams{
ctx: ctx,
start: start,
count: count,
}
select {
case <-f.ctx.Done():
return errFetcherCtxIsDone
default:
f.fetchRequests <- &fetchRequestParams{
ctx: ctx,
start: start,
count: count,
}
case f.fetchRequests <- request:
}
return nil
}
// handleRequest parses fetch request and forwards it to response builder.
func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64) {
func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64) *fetchRequestResponse {
ctx, span := trace.StartSpan(ctx, "initialsync.handleRequest")
defer span.End()
// sendResponse ensures that response is not sent to a closed channel (when context is done).
sendResponse := func(ctx context.Context, response *fetchRequestResponse) {
if ctx.Err() != nil {
log.WithError(ctx.Err()).Debug("Can not send fetch request response")
return
}
f.fetchResponses <- response
response := &fetchRequestResponse{
start: start,
count: count,
blocks: []*eth.SignedBeaconBlock{},
err: nil,
peers: []peer.ID{},
}
if ctx.Err() != nil {
sendResponse(ctx, nil)
return
response.err = ctx.Err()
return response
}
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
root, finalizedEpoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
log.WithFields(logrus.Fields{
"start": start,
"count": count,
"finalizedEpoch": finalizedEpoch,
"numPeers": len(peers),
}).Debug("Block fetcher received a request")
if len(peers) == 0 {
log.Error(errNoPeersAvailable)
return
response.err = errNoPeersAvailable
return response
}
// Short circuit start far exceeding the highest finalized epoch in some infinite loop.
highestFinalizedSlot := helpers.StartSlot(finalizedEpoch + 1)
if start > highestFinalizedSlot {
log.WithError(errStartSlotIsTooHigh).Debug("Block fetch request failed")
sendResponse(ctx, &fetchRequestResponse{
start: start,
count: count,
err: errStartSlotIsTooHigh,
})
return
response.err = errStartSlotIsTooHigh
return response
}
resp, err := f.collectPeerResponses(ctx, root, finalizedEpoch, start, 1, count, peers)
blocks, err := f.collectPeerResponses(ctx, root, finalizedEpoch, start, 1, count, peers)
if err != nil {
log.WithError(err).Debug("Block fetch request failed")
sendResponse(ctx, &fetchRequestResponse{
start: start,
count: count,
err: err,
})
return
response.err = err
return response
}
sendResponse(ctx, &fetchRequestResponse{
start: start,
count: count,
blocks: resp,
peers: peers,
})
response.blocks = blocks
response.peers = peers
return response
}
// collectPeerResponses orchestrates block fetching from the available peers.
@@ -268,12 +253,6 @@ func (f *blocksFetcher) collectPeerResponses(
// Spread load evenly among available peers.
perPeerCount := count / uint64(len(peers))
remainder := int(count % uint64(len(peers)))
log.WithFields(logrus.Fields{
"start": start,
"count": count,
"perPeerCount": perPeerCount,
"remainder": remainder,
}).Debug("Distribute request among available peers")
for i, pid := range peers {
start, step := start+uint64(i)*step, step*uint64(len(peers))
@@ -283,10 +262,10 @@ func (f *blocksFetcher) collectPeerResponses(
if i < remainder {
count++
}
// Asking for no blocks may cause the client to hang. This should never happen and
// the peer may return an error anyway, but we'll ask for at least one block.
// Asking for no blocks may cause the client to hang.
if count == 0 {
count++
p2pRequests.Done()
continue
}
go func(ctx context.Context, pid peer.ID) {
@@ -294,16 +273,24 @@ func (f *blocksFetcher) collectPeerResponses(
blocks, err := f.requestBeaconBlocksByRange(ctx, pid, root, start, step, count)
if err != nil {
errChan <- err
return
select {
case <-ctx.Done():
case errChan <- err:
return
}
}
select {
case <-ctx.Done():
case blocksChan <- blocks:
}
blocksChan <- blocks
}(ctx, pid)
}
var unionRespBlocks []*eth.SignedBeaconBlock
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errChan:
return nil, err
case resp, ok := <-blocksChan:
@@ -360,7 +347,6 @@ func (f *blocksFetcher) requestBeaconBlocksByRange(
return f.requestBeaconBlocksByRange(ctx, newPID, root, start, step, count)
}
log.WithField("peer", pid).WithField("count", len(resp)).Debug("Received blocks")
return resp, nil
}
@@ -384,7 +370,7 @@ func (f *blocksFetcher) requestBlocks(
}).Debug("Requesting blocks")
stream, err := f.p2p.Send(ctx, req, pid)
if err != nil {
return nil, errors.Wrap(err, "failed to send request to peer")
return nil, err
}
defer stream.Close()
@@ -395,7 +381,7 @@ func (f *blocksFetcher) requestBlocks(
break
}
if err != nil {
return nil, errors.Wrap(err, "failed to read chunked block")
return nil, err
}
resp = append(resp, blk)
}

View File

@@ -283,13 +283,15 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
initializeRootCache(tt.expectedBlockSlots, t)
cache.initializeRootCache(tt.expectedBlockSlots, t)
beaconDB := dbtest.SetupDB(t)
p := p2pt.NewTestP2P(t)
connectPeers(t, p, tt.peers, p.Peers())
genesisRoot := rootCache[0]
cache.RLock()
genesisRoot := cache.rootCache[0]
cache.RUnlock()
err := beaconDB.SaveBlock(context.Background(), &eth.SignedBeaconBlock{
Block: &eth.BeaconBlock{
@@ -439,12 +441,10 @@ func TestBlocksFetcherHandleRequest(t *testing.T) {
},
}
hook := logTest.NewGlobal()
mc, p2p, beaconDB := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers)
defer dbtest.TeardownDB(t, beaconDB)
t.Run("context cancellation", func(t *testing.T) {
hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
@@ -452,13 +452,13 @@ func TestBlocksFetcherHandleRequest(t *testing.T) {
})
cancel()
fetcher.handleRequest(ctx, 1, blockBatchSize)
testutil.AssertLogsContain(t, hook, "Can not send fetch request response")
testutil.AssertLogsContain(t, hook, "context canceled")
response := fetcher.handleRequest(ctx, 1, blockBatchSize)
if response.err == nil {
t.Errorf("expected error: %v", errFetcherCtxIsDone)
}
})
t.Run("receive blocks", func(t *testing.T) {
hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
@@ -467,7 +467,13 @@ func TestBlocksFetcherHandleRequest(t *testing.T) {
})
requestCtx, _ := context.WithTimeout(context.Background(), 2*time.Second)
go fetcher.handleRequest(requestCtx, 1 /* start */, blockBatchSize /* count */)
go func() {
response := fetcher.handleRequest(requestCtx, 1 /* start */, blockBatchSize /* count */)
select {
case <-ctx.Done():
case fetcher.fetchResponses <- response:
}
}()
var blocks []*eth.SignedBeaconBlock
select {
@@ -483,7 +489,6 @@ func TestBlocksFetcherHandleRequest(t *testing.T) {
if len(blocks) != blockBatchSize {
t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchSize, len(blocks))
}
testutil.AssertLogsContain(t, hook, "Received blocks")
var receivedBlockSlots []uint64
for _, blk := range blocks {
@@ -647,12 +652,14 @@ func TestBlocksFetcherSelectFailOverPeer(t *testing.T) {
}
func initializeTestServices(t *testing.T, blocks []uint64, peers []*peerData) (*mock.ChainService, *p2pt.TestP2P, db.Database) {
initializeRootCache(blocks, t)
cache.initializeRootCache(blocks, t)
beaconDB := dbtest.SetupDB(t)
p := p2pt.NewTestP2P(t)
connectPeers(t, p, peers, p.Peers())
genesisRoot := rootCache[0]
cache.RLock()
genesisRoot := cache.rootCache[0]
cache.RUnlock()
err := beaconDB.SaveBlock(context.Background(), &eth.SignedBeaconBlock{
Block: &eth.BeaconBlock{

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"sync"
"testing"
"time"
@@ -17,7 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
beaconsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
@@ -26,8 +27,13 @@ import (
"github.com/sirupsen/logrus"
)
var rootCache map[uint64][32]byte
var parentSlotCache map[uint64]uint64
type testCache struct {
sync.RWMutex
rootCache map[uint64][32]byte
parentSlotCache map[uint64]uint64
}
var cache = &testCache{}
type peerData struct {
blocks []uint64 // slots that peer has blocks
@@ -235,13 +241,15 @@ func TestRoundRobinSync(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
initializeRootCache(tt.expectedBlockSlots, t)
cache.initializeRootCache(tt.expectedBlockSlots, t)
p := p2pt.NewTestP2P(t)
beaconDB := dbtest.SetupDB(t)
connectPeers(t, p, tt.peers, p.Peers())
genesisRoot := rootCache[0]
cache.RLock()
genesisRoot := cache.rootCache[0]
cache.RUnlock()
err := beaconDB.SaveBlock(context.Background(), &eth.SignedBeaconBlock{
Block: &eth.BeaconBlock{
@@ -330,7 +338,9 @@ func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus
if (slot-req.StartSlot)%req.Step != 0 {
continue
}
parentRoot := rootCache[parentSlotCache[slot]]
cache.RLock()
parentRoot := cache.rootCache[cache.parentSlotCache[slot]]
cache.RUnlock()
blk := &eth.SignedBeaconBlock{
Block: &eth.BeaconBlock{
Slot: slot,
@@ -352,7 +362,7 @@ func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus
}
for i := 0; i < len(ret); i++ {
if err := sync.WriteChunk(stream, peer.Encoding(), ret[i]); err != nil {
if err := beaconsync.WriteChunk(stream, peer.Encoding(), ret[i]); err != nil {
t.Error(err)
}
}
@@ -398,9 +408,12 @@ func makeSequence(start, end uint64) []uint64 {
return seq
}
func initializeRootCache(reqSlots []uint64, t *testing.T) {
rootCache = make(map[uint64][32]byte)
parentSlotCache = make(map[uint64]uint64)
func (c *testCache) initializeRootCache(reqSlots []uint64, t *testing.T) {
c.Lock()
defer c.Unlock()
c.rootCache = make(map[uint64][32]byte)
c.parentSlotCache = make(map[uint64]uint64)
parentSlot := uint64(0)
genesisBlock := &eth.BeaconBlock{
Slot: 0,
@@ -409,7 +422,7 @@ func initializeRootCache(reqSlots []uint64, t *testing.T) {
if err != nil {
t.Fatal(err)
}
rootCache[0] = genesisRoot
c.rootCache[0] = genesisRoot
parentRoot := genesisRoot
for _, slot := range reqSlots {
currentBlock := &eth.BeaconBlock{
@@ -420,8 +433,8 @@ func initializeRootCache(reqSlots []uint64, t *testing.T) {
if err != nil {
t.Fatal(err)
}
rootCache[slot] = parentRoot
parentSlotCache[slot] = parentSlot
c.rootCache[slot] = parentRoot
c.parentSlotCache[slot] = parentSlot
parentSlot = slot
}
}