mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 07:58:22 -05:00
Re-arranges fetcher locks -> per peer level (#5906)
* fetcher locks -> per peer level * typo * removes redundant setting to nil * Merge refs/heads/master into init-sync-rearrange-locks
This commit is contained in:
@@ -59,6 +59,8 @@ go_test(
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/db/testing:go_default_library",
|
||||
"//beacon-chain/flags:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
"//beacon-chain/p2p/peers:go_default_library",
|
||||
"//beacon-chain/p2p/testing:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
@@ -74,6 +76,7 @@ go_test(
|
||||
"//shared/testutil:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
|
||||
"@com_github_kevinms_leakybucket_go//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
|
||||
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
|
||||
@@ -98,6 +101,7 @@ go_test(
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/db/testing:go_default_library",
|
||||
"//beacon-chain/flags:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
"//beacon-chain/p2p/peers:go_default_library",
|
||||
"//beacon-chain/p2p/testing:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
@@ -113,6 +117,7 @@ go_test(
|
||||
"//shared/testutil:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
|
||||
"@com_github_kevinms_leakybucket_go//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
|
||||
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
|
||||
|
||||
@@ -36,6 +36,10 @@ const (
|
||||
peersPercentagePerRequest = 0.75
|
||||
// handshakePollingInterval is a polling interval for checking the number of received handshakes.
|
||||
handshakePollingInterval = 5 * time.Second
|
||||
// peerLocksPollingInterval is a polling interval for checking if there are stale peer locks.
|
||||
peerLocksPollingInterval = 5 * time.Minute
|
||||
// peerLockMaxAge is maximum time before stale lock is purged.
|
||||
peerLockMaxAge = 60 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -61,11 +65,18 @@ type blocksFetcher struct {
|
||||
p2p p2p.P2P
|
||||
blocksPerSecond uint64
|
||||
rateLimiter *leakybucket.Collector
|
||||
peerLocks map[peer.ID]*peerLock
|
||||
fetchRequests chan *fetchRequestParams
|
||||
fetchResponses chan *fetchRequestResponse
|
||||
quit chan struct{} // termination notifier
|
||||
}
|
||||
|
||||
// peerLock restricts fetcher actions on per peer basis. Currently, used for rate limiting.
|
||||
type peerLock struct {
|
||||
sync.Mutex
|
||||
accessed time.Time
|
||||
}
|
||||
|
||||
// fetchRequestParams holds parameters necessary to schedule a fetch request.
|
||||
type fetchRequestParams struct {
|
||||
ctx context.Context // if provided, it is used instead of global fetcher's context
|
||||
@@ -98,6 +109,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
|
||||
p2p: cfg.p2p,
|
||||
blocksPerSecond: uint64(blocksPerSecond),
|
||||
rateLimiter: rateLimiter,
|
||||
peerLocks: make(map[peer.ID]*peerLock),
|
||||
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
|
||||
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
|
||||
quit: make(chan struct{}),
|
||||
@@ -137,6 +149,21 @@ func (f *blocksFetcher) loop() {
|
||||
close(f.fetchResponses)
|
||||
}()
|
||||
|
||||
// Periodically remove stale peer locks.
|
||||
go func() {
|
||||
ticker := time.NewTicker(peerLocksPollingInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
f.removeStalePeerLocks(peerLockMaxAge)
|
||||
case <-f.ctx.Done():
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Main loop.
|
||||
for {
|
||||
// Make sure there is are available peers before processing requests.
|
||||
if _, err := f.waitForMinimumPeers(f.ctx); err != nil {
|
||||
@@ -398,20 +425,31 @@ func (f *blocksFetcher) requestBlocks(
|
||||
req *p2ppb.BeaconBlocksByRangeRequest,
|
||||
pid peer.ID,
|
||||
) ([]*eth.SignedBeaconBlock, error) {
|
||||
f.Lock()
|
||||
l := f.getPeerLock(pid)
|
||||
if l == nil {
|
||||
return nil, errors.New("cannot obtain lock")
|
||||
}
|
||||
l.Lock()
|
||||
log.WithFields(logrus.Fields{
|
||||
"peer": pid,
|
||||
"start": req.StartSlot,
|
||||
"count": req.Count,
|
||||
"step": req.Step,
|
||||
"remaining": f.rateLimiter.Remaining(pid.String()),
|
||||
"peer": pid,
|
||||
"start": req.StartSlot,
|
||||
"count": req.Count,
|
||||
"step": req.Step,
|
||||
"capacity": f.rateLimiter.Remaining(pid.String()),
|
||||
}).Debug("Requesting blocks")
|
||||
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
|
||||
log.WithField("peer", pid).Debug("Slowing down for rate limit")
|
||||
time.Sleep(f.rateLimiter.TillEmpty(pid.String()))
|
||||
timer := time.NewTimer(f.rateLimiter.TillEmpty(pid.String()))
|
||||
select {
|
||||
case <-f.ctx.Done():
|
||||
timer.Stop()
|
||||
return nil, errFetcherCtxIsDone
|
||||
case <-timer.C:
|
||||
// Peer has gathered enough capacity to be polled again.
|
||||
}
|
||||
}
|
||||
f.rateLimiter.Add(pid.String(), int64(req.Count))
|
||||
f.Unlock()
|
||||
l.Unlock()
|
||||
stream, err := f.p2p.Send(ctx, req, p2p.RPCBlocksByRangeTopic, pid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -437,6 +475,34 @@ func (f *blocksFetcher) requestBlocks(
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// getPeerLock returns peer lock for a given peer. If lock is not found, it is created.
|
||||
func (f *blocksFetcher) getPeerLock(pid peer.ID) *peerLock {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
if lock, ok := f.peerLocks[pid]; ok {
|
||||
lock.accessed = roughtime.Now()
|
||||
return lock
|
||||
}
|
||||
f.peerLocks[pid] = &peerLock{
|
||||
Mutex: sync.Mutex{},
|
||||
accessed: roughtime.Now(),
|
||||
}
|
||||
return f.peerLocks[pid]
|
||||
}
|
||||
|
||||
// removeStalePeerLocks is a cleanup procedure which removes stale locks.
|
||||
func (f *blocksFetcher) removeStalePeerLocks(age time.Duration) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
for peerID, lock := range f.peerLocks {
|
||||
if time.Since(lock.accessed) >= age {
|
||||
lock.Lock()
|
||||
delete(f.peerLocks, peerID)
|
||||
lock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// selectFailOverPeer randomly selects fail over peer from the list of available peers.
|
||||
func (f *blocksFetcher) selectFailOverPeer(excludedPID peer.ID, peers []peer.ID) (peer.ID, []peer.ID, error) {
|
||||
for i, pid := range peers {
|
||||
|
||||
@@ -9,17 +9,21 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/kevinms/leakybucket-go"
|
||||
core "github.com/libp2p/go-libp2p-core"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
|
||||
p2pm "github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
||||
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/roughtime"
|
||||
"github.com/prysmaticlabs/prysm/shared/sliceutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -362,7 +366,9 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
|
||||
for _, blk := range blocks {
|
||||
receivedBlockSlots = append(receivedBlockSlots, blk.Block.Slot)
|
||||
}
|
||||
if missing := sliceutil.NotUint64(sliceutil.IntersectionUint64(tt.expectedBlockSlots, receivedBlockSlots), tt.expectedBlockSlots); len(missing) > 0 {
|
||||
missing := sliceutil.NotUint64(
|
||||
sliceutil.IntersectionUint64(tt.expectedBlockSlots, receivedBlockSlots), tt.expectedBlockSlots)
|
||||
if len(missing) > 0 {
|
||||
t.Errorf("Missing blocks at slots %v", missing)
|
||||
}
|
||||
})
|
||||
@@ -457,7 +463,10 @@ func TestBlocksFetcherHandleRequest(t *testing.T) {
|
||||
for _, blk := range blocks {
|
||||
receivedBlockSlots = append(receivedBlockSlots, blk.Block.Slot)
|
||||
}
|
||||
if missing := sliceutil.NotUint64(sliceutil.IntersectionUint64(chainConfig.expectedBlockSlots, receivedBlockSlots), chainConfig.expectedBlockSlots); len(missing) > 0 {
|
||||
missing := sliceutil.NotUint64(
|
||||
sliceutil.IntersectionUint64(chainConfig.expectedBlockSlots, receivedBlockSlots),
|
||||
chainConfig.expectedBlockSlots)
|
||||
if len(missing) > 0 {
|
||||
t.Errorf("Missing blocks at slots %v", missing)
|
||||
}
|
||||
})
|
||||
@@ -752,3 +761,209 @@ func TestBlocksFetcherFilterPeers(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlocksFetcherRequestBlocksRateLimitingLocks(t *testing.T) {
|
||||
p1 := p2pt.NewTestP2P(t)
|
||||
p2 := p2pt.NewTestP2P(t)
|
||||
p3 := p2pt.NewTestP2P(t)
|
||||
p1.Connect(p2)
|
||||
p1.Connect(p3)
|
||||
if len(p1.Host.Network().Peers()) != 2 {
|
||||
t.Fatal("Expected peers to be connected")
|
||||
}
|
||||
req := &p2ppb.BeaconBlocksByRangeRequest{
|
||||
StartSlot: 100,
|
||||
Step: 1,
|
||||
Count: 64,
|
||||
}
|
||||
|
||||
topic := p2pm.RPCBlocksByRangeTopic
|
||||
protocol := core.ProtocolID(topic + p2.Encoding().ProtocolSuffix())
|
||||
streamHandlerFn := func(stream network.Stream) {
|
||||
if err := stream.Close(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
p2.Host.SetStreamHandler(protocol, streamHandlerFn)
|
||||
p3.Host.SetStreamHandler(protocol, streamHandlerFn)
|
||||
|
||||
burstFactor := uint64(flags.Get().BlockBatchLimitBurstFactor)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1})
|
||||
fetcher.rateLimiter = leakybucket.NewCollector(float64(req.Count), int64(req.Count*burstFactor), false)
|
||||
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
// Exhaust available rate for p2, so that rate limiting is triggered.
|
||||
for i := uint64(0); i <= burstFactor; i++ {
|
||||
if i == burstFactor {
|
||||
// The next request will trigger rate limiting for p2. Now, allow concurrent
|
||||
// p3 data request (p3 shouldn't be rate limited).
|
||||
time.AfterFunc(1*time.Second, func() {
|
||||
wg.Done()
|
||||
})
|
||||
}
|
||||
_, err := fetcher.requestBlocks(ctx, req, p2.PeerID())
|
||||
if err != nil && err != errFetcherCtxIsDone {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait until p2 exhausts its rate and is spinning on rate limiting timer.
|
||||
wg.Wait()
|
||||
|
||||
// The next request should NOT trigger rate limiting as rate is exhausted for p2, not p3.
|
||||
ch := make(chan struct{}, 1)
|
||||
go func() {
|
||||
_, err := fetcher.requestBlocks(ctx, req, p3.PeerID())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
timer := time.NewTimer(2 * time.Second)
|
||||
select {
|
||||
case <-timer.C:
|
||||
t.Error("p3 takes too long to respond: lock contention")
|
||||
case <-ch:
|
||||
// p3 responded w/o waiting for rate limiter's lock (on which p2 spins).
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlocksFetcherRemoveStalePeerLocks(t *testing.T) {
|
||||
type peerData struct {
|
||||
peerID peer.ID
|
||||
accessed time.Time
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
age time.Duration
|
||||
peersIn []peerData
|
||||
peersOut []peerData
|
||||
}{
|
||||
{
|
||||
name: "empty map",
|
||||
age: peerLockMaxAge,
|
||||
peersIn: []peerData{},
|
||||
peersOut: []peerData{},
|
||||
},
|
||||
{
|
||||
name: "no stale peer locks",
|
||||
age: peerLockMaxAge,
|
||||
peersIn: []peerData{
|
||||
{
|
||||
peerID: "abc",
|
||||
accessed: roughtime.Now(),
|
||||
},
|
||||
{
|
||||
peerID: "def",
|
||||
accessed: roughtime.Now(),
|
||||
},
|
||||
{
|
||||
peerID: "ghi",
|
||||
accessed: roughtime.Now(),
|
||||
},
|
||||
},
|
||||
peersOut: []peerData{
|
||||
{
|
||||
peerID: "abc",
|
||||
accessed: roughtime.Now(),
|
||||
},
|
||||
{
|
||||
peerID: "def",
|
||||
accessed: roughtime.Now(),
|
||||
},
|
||||
{
|
||||
peerID: "ghi",
|
||||
accessed: roughtime.Now(),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one stale peer lock",
|
||||
age: peerLockMaxAge,
|
||||
peersIn: []peerData{
|
||||
{
|
||||
peerID: "abc",
|
||||
accessed: roughtime.Now(),
|
||||
},
|
||||
{
|
||||
peerID: "def",
|
||||
accessed: roughtime.Now().Add(-peerLockMaxAge),
|
||||
},
|
||||
{
|
||||
peerID: "ghi",
|
||||
accessed: roughtime.Now(),
|
||||
},
|
||||
},
|
||||
peersOut: []peerData{
|
||||
{
|
||||
peerID: "abc",
|
||||
accessed: roughtime.Now(),
|
||||
},
|
||||
{
|
||||
peerID: "ghi",
|
||||
accessed: roughtime.Now(),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "all peer locks are stale",
|
||||
age: peerLockMaxAge,
|
||||
peersIn: []peerData{
|
||||
{
|
||||
peerID: "abc",
|
||||
accessed: roughtime.Now().Add(-peerLockMaxAge),
|
||||
},
|
||||
{
|
||||
peerID: "def",
|
||||
accessed: roughtime.Now().Add(-peerLockMaxAge),
|
||||
},
|
||||
{
|
||||
peerID: "ghi",
|
||||
accessed: roughtime.Now().Add(-peerLockMaxAge),
|
||||
},
|
||||
},
|
||||
peersOut: []peerData{},
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{})
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
fetcher.peerLocks = make(map[peer.ID]*peerLock, len(tt.peersIn))
|
||||
for _, data := range tt.peersIn {
|
||||
fetcher.peerLocks[data.peerID] = &peerLock{
|
||||
Mutex: sync.Mutex{},
|
||||
accessed: data.accessed,
|
||||
}
|
||||
}
|
||||
|
||||
fetcher.removeStalePeerLocks(tt.age)
|
||||
|
||||
var peersOut1, peersOut2 []peer.ID
|
||||
for _, data := range tt.peersOut {
|
||||
peersOut1 = append(peersOut1, data.peerID)
|
||||
}
|
||||
for peerID := range fetcher.peerLocks {
|
||||
peersOut2 = append(peersOut2, peerID)
|
||||
}
|
||||
sort.SliceStable(peersOut1, func(i, j int) bool {
|
||||
return peersOut1[i].String() < peersOut1[j].String()
|
||||
})
|
||||
sort.SliceStable(peersOut2, func(i, j int) bool {
|
||||
return peersOut2[i].String() < peersOut2[j].String()
|
||||
})
|
||||
if !reflect.DeepEqual(peersOut1, peersOut2) {
|
||||
t.Errorf("unexpected peers map, want: %#v, got: %#v", peersOut1, peersOut2)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user