mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
Compare commits
2 Commits
develop
...
backfill-i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2049422a9d | ||
|
|
8c7f4d8697 |
@@ -128,6 +128,7 @@ type BeaconNode struct {
|
||||
slasherEnabled bool
|
||||
lcStore *lightclient.Store
|
||||
ConfigOptions []params.Option
|
||||
syncToggle *regularsync.ServiceToggler
|
||||
}
|
||||
|
||||
// New creates a new node instance, sets up configuration options, and registers
|
||||
@@ -161,6 +162,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
|
||||
initialSyncComplete: make(chan struct{}),
|
||||
syncChecker: &initialsync.SyncChecker{},
|
||||
slasherEnabled: cliCtx.Bool(flags.SlasherFlag.Name),
|
||||
syncToggle: regularsync.NewServiceToggler(),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
@@ -234,6 +236,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
|
||||
beacon.BackfillOpts,
|
||||
backfill.WithVerifierWaiter(beacon.verifyInitWaiter),
|
||||
backfill.WithInitSyncWaiter(initSyncWaiter(ctx, beacon.initialSyncComplete)),
|
||||
backfill.WithServiceToggle(beacon.syncToggle),
|
||||
)
|
||||
|
||||
if err := registerServices(cliCtx, beacon, synchronizer, bfs); err != nil {
|
||||
@@ -371,7 +374,7 @@ func registerServices(cliCtx *cli.Context, beacon *BeaconNode, synchronizer *sta
|
||||
}
|
||||
|
||||
log.Debugln("Registering Initial Sync Service")
|
||||
if err := beacon.registerInitialSyncService(beacon.initialSyncComplete); err != nil {
|
||||
if err := beacon.registerInitialSyncService(beacon.initialSyncComplete, beacon.syncToggle); err != nil {
|
||||
return errors.Wrap(err, "could not register initial sync service")
|
||||
}
|
||||
|
||||
@@ -848,7 +851,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
|
||||
return b.services.RegisterService(rs)
|
||||
}
|
||||
|
||||
func (b *BeaconNode) registerInitialSyncService(complete chan struct{}) error {
|
||||
func (b *BeaconNode) registerInitialSyncService(complete chan struct{}, toggler *regularsync.ServiceToggler) error {
|
||||
var chainService *blockchain.Service
|
||||
if err := b.services.FetchService(&chainService); err != nil {
|
||||
return err
|
||||
@@ -857,6 +860,7 @@ func (b *BeaconNode) registerInitialSyncService(complete chan struct{}) error {
|
||||
opts := []initialsync.Option{
|
||||
initialsync.WithVerifierWaiter(b.verifyInitWaiter),
|
||||
initialsync.WithSyncChecker(b.syncChecker),
|
||||
initialsync.WithServiceToggle(toggler),
|
||||
}
|
||||
is := initialsync.NewService(b.ctx, &initialsync.Config{
|
||||
DB: b.db,
|
||||
|
||||
@@ -14,7 +14,6 @@ go_library(
|
||||
"doc.go",
|
||||
"error.go",
|
||||
"fork_watcher.go",
|
||||
"fuzz_exports.go", # keep
|
||||
"log.go",
|
||||
"metrics.go",
|
||||
"options.go",
|
||||
@@ -49,6 +48,7 @@ go_library(
|
||||
"subscriber_sync_committee_message.go",
|
||||
"subscriber_sync_contribution_proof.go",
|
||||
"subscription_topic_handler.go",
|
||||
"toggle.go",
|
||||
"validate_aggregate_proof.go",
|
||||
"validate_attester_slashing.go",
|
||||
"validate_beacon_attestation.go",
|
||||
@@ -63,11 +63,7 @@ go_library(
|
||||
"validate_voluntary_exit.go",
|
||||
],
|
||||
importpath = "github.com/OffchainLabs/prysm/v6/beacon-chain/sync",
|
||||
visibility = [
|
||||
"//beacon-chain:__subpackages__",
|
||||
"//cmd:__subpackages__",
|
||||
"//testing:__subpackages__",
|
||||
],
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//async:go_default_library",
|
||||
"//async/abool:go_default_library",
|
||||
@@ -161,7 +157,6 @@ go_library(
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
size = "small",
|
||||
srcs = [
|
||||
"batch_verifier_test.go",
|
||||
"blobs_test.go",
|
||||
@@ -200,6 +195,7 @@ go_test(
|
||||
"subscription_topic_handler_test.go",
|
||||
"sync_fuzz_test.go",
|
||||
"sync_test.go",
|
||||
"toggle_test.go",
|
||||
"validate_aggregate_proof_test.go",
|
||||
"validate_attester_slashing_test.go",
|
||||
"validate_beacon_attestation_test.go",
|
||||
@@ -214,7 +210,6 @@ go_test(
|
||||
"validate_voluntary_exit_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
shard_count = 4,
|
||||
deps = [
|
||||
"//async/abool:go_default_library",
|
||||
"//async/event:go_default_library",
|
||||
|
||||
@@ -45,11 +45,12 @@ type p2pBatchWorkerPool struct {
|
||||
endSeq []batch
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
toggle *sync.ServiceToggler
|
||||
}
|
||||
|
||||
var _ batchWorkerPool = &p2pBatchWorkerPool{}
|
||||
|
||||
func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int) *p2pBatchWorkerPool {
|
||||
func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int, tg *sync.ServiceToggler) *p2pBatchWorkerPool {
|
||||
nw := defaultNewWorker(p)
|
||||
return &p2pBatchWorkerPool{
|
||||
newWorker: nw,
|
||||
@@ -59,6 +60,7 @@ func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int) *p2pBatchWorkerPool {
|
||||
fromWorkers: make(chan batch),
|
||||
maxBatches: maxBatches,
|
||||
shutdownErr: make(chan error),
|
||||
toggle: tg,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,6 +117,7 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) {
|
||||
// This ticker exists to periodically break out of the channel select
|
||||
// to retry failed assignments.
|
||||
case b := <-p.fromWorkers:
|
||||
p.toggle.Release(sync.ToggleGroupBackfill)
|
||||
pid := b.busy
|
||||
busy[pid] = false
|
||||
if b.state == batchBlobSync {
|
||||
@@ -143,6 +146,9 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) {
|
||||
return
|
||||
}
|
||||
for _, pid := range assigned {
|
||||
if err := p.toggle.Acquire(p.ctx, sync.ToggleGroupBackfill); err != nil {
|
||||
p.shutdown(err)
|
||||
}
|
||||
if err := todo[0].waitUntilReady(p.ctx); err != nil {
|
||||
log.WithError(p.ctx.Err()).Info("p2pBatchWorkerPool context canceled, shutting down")
|
||||
p.shutdown(p.ctx.Err())
|
||||
|
||||
@@ -42,7 +42,8 @@ func TestPoolDetectAllEnded(t *testing.T) {
|
||||
p2p := p2ptest.NewTestP2P(t)
|
||||
ctx := t.Context()
|
||||
ma := &mockAssigner{}
|
||||
pool := newP2PBatchWorkerPool(p2p, nw)
|
||||
tg := sync.NewServiceToggler()
|
||||
pool := newP2PBatchWorkerPool(p2p, nw, tg)
|
||||
st, err := util.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
keys, err := st.PublicKeys()
|
||||
|
||||
@@ -41,6 +41,7 @@ type Service struct {
|
||||
blobStore *filesystem.BlobStorage
|
||||
initSyncWaiter func() error
|
||||
complete chan struct{}
|
||||
toggler *sync.ServiceToggler
|
||||
}
|
||||
|
||||
var _ runtime.Service = (*Service)(nil)
|
||||
@@ -104,6 +105,15 @@ func WithInitSyncWaiter(w func() error) ServiceOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithServiceToggle sets the ServiceToggler, which is used to coordinate
|
||||
// with backfill, so that only one service runs at a time.
|
||||
func WithServiceToggle(toggler *sync.ServiceToggler) ServiceOption {
|
||||
return func(s *Service) error {
|
||||
s.toggler = toggler
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// InitializerWaiter is an interface that is satisfied by verification.InitializerWaiter.
|
||||
// Using this interface enables node init to satisfy this requirement for the backfill service
|
||||
// while also allowing backfill to mock it in tests.
|
||||
@@ -157,7 +167,7 @@ func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage,
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
s.pool = newP2PBatchWorkerPool(p, s.nWorkers)
|
||||
s.pool = newP2PBatchWorkerPool(p, s.nWorkers, s.toggler)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
@@ -72,6 +72,7 @@ type Service struct {
|
||||
newDataColumnsVerifier verification.NewDataColumnsVerifier
|
||||
ctxMap sync.ContextByteVersions
|
||||
genesisTime time.Time
|
||||
toggler *sync.ServiceToggler
|
||||
}
|
||||
|
||||
// Option is a functional option for the initial-sync Service.
|
||||
@@ -93,6 +94,14 @@ func WithSyncChecker(checker *SyncChecker) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithServiceToggle sets the ServiceToggler, which is used to coordinate
|
||||
// with backfill, so that only one service runs at a time.
|
||||
func WithServiceToggle(toggler *sync.ServiceToggler) Option {
|
||||
return func(s *Service) {
|
||||
s.toggler = toggler
|
||||
}
|
||||
}
|
||||
|
||||
// SyncChecker allows other services to check the current status of
|
||||
// initial-sync and use that internally in their service.
|
||||
type SyncChecker struct {
|
||||
@@ -159,22 +168,25 @@ func (s *Service) Start() {
|
||||
log.Debug("Exiting Initial Sync Service")
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.beginSync(); err != nil {
|
||||
log.WithError(err).Error("Failed to get acquire sync toggle lock")
|
||||
return
|
||||
}
|
||||
defer s.completeSync()
|
||||
s.genesisTime = gt
|
||||
// Exit entering round-robin sync if we require 0 peers to sync.
|
||||
if flags.Get().MinimumSyncPeers == 0 {
|
||||
s.markSynced()
|
||||
log.WithField("genesisTime", s.genesisTime).Info("Due to number of peers required for sync being set at 0, entering regular sync immediately.")
|
||||
return
|
||||
}
|
||||
if s.genesisTime.After(prysmTime.Now()) {
|
||||
s.markSynced()
|
||||
log.WithField("genesisTime", s.genesisTime).Info("Genesis time has not arrived - not syncing")
|
||||
return
|
||||
}
|
||||
currentSlot := clock.CurrentSlot()
|
||||
if slots.ToEpoch(currentSlot) == 0 {
|
||||
log.WithField("genesisTime", s.genesisTime).Info("Chain started within the last epoch - not syncing")
|
||||
s.markSynced()
|
||||
return
|
||||
}
|
||||
s.chainStarted.Set()
|
||||
@@ -183,7 +195,6 @@ func (s *Service) Start() {
|
||||
// Are we already in sync, or close to it?
|
||||
if slots.ToEpoch(s.cfg.Chain.HeadSlot()) == slots.ToEpoch(currentSlot) {
|
||||
log.Info("Already synced to the current chain head")
|
||||
s.markSynced()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -204,7 +215,18 @@ func (s *Service) Start() {
|
||||
panic(err) // lint:nopanic -- Unexpected error. This should probably be surfaced with a returned error.
|
||||
}
|
||||
log.WithField("slot", s.cfg.Chain.HeadSlot()).Info("Synced up to")
|
||||
s.markSynced()
|
||||
}
|
||||
|
||||
func (s *Service) beginSync() error {
|
||||
s.synced.UnSet()
|
||||
return s.toggler.Acquire(s.ctx, sync.ToggleGroupRangeSync)
|
||||
}
|
||||
|
||||
// completeSync marks node as synced and notifies feed listeners.
|
||||
func (s *Service) completeSync() {
|
||||
s.toggler.Release(sync.ToggleGroupRangeSync)
|
||||
s.synced.Set()
|
||||
close(s.cfg.InitialSyncComplete)
|
||||
}
|
||||
|
||||
// fetchOriginSidecars fetches origin sidecars
|
||||
@@ -289,8 +311,10 @@ func (s *Service) Resync() error {
|
||||
}
|
||||
|
||||
// Set it to false since we are syncing again.
|
||||
s.synced.UnSet()
|
||||
defer func() { s.synced.Set() }() // Reset it at the end of the method.
|
||||
if err := s.beginSync(); err != nil {
|
||||
return errors.Wrap(err, "begin sync")
|
||||
}
|
||||
defer func() { s.completeSync() }() // Reset it at the end of the method.
|
||||
|
||||
_, err = s.waitForMinimumPeers()
|
||||
if err != nil {
|
||||
@@ -325,12 +349,6 @@ func (s *Service) waitForMinimumPeers() ([]peer.ID, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// markSynced marks node as synced and notifies feed listeners.
|
||||
func (s *Service) markSynced() {
|
||||
s.synced.Set()
|
||||
close(s.cfg.InitialSyncComplete)
|
||||
}
|
||||
|
||||
func missingBlobRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2ptypes.BlobSidecarsByRootReq, error) {
|
||||
r := blk.Root()
|
||||
if blk.Version() < version.Deneb {
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
testp2p "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
prysmSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
bcsync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
@@ -174,6 +174,7 @@ func TestService_InitStartStop(t *testing.T) {
|
||||
StateNotifier: &mock.MockStateNotifier{},
|
||||
InitialSyncComplete: make(chan struct{}),
|
||||
})
|
||||
s.toggler = bcsync.NewServiceToggler()
|
||||
s.verifierWaiter = verification.NewInitializerWaiter(gs, nil, nil)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.NotNil(t, s)
|
||||
@@ -218,6 +219,7 @@ func TestService_waitForStateInitialization(t *testing.T) {
|
||||
genesisChan: make(chan time.Time),
|
||||
}
|
||||
s.verifierWaiter = verification.NewInitializerWaiter(cs, nil, nil)
|
||||
s.toggler = bcsync.NewServiceToggler()
|
||||
return s, cs
|
||||
}
|
||||
|
||||
@@ -318,16 +320,18 @@ func TestService_markSynced(t *testing.T) {
|
||||
StateNotifier: mc.StateNotifier(),
|
||||
InitialSyncComplete: make(chan struct{}),
|
||||
})
|
||||
s.toggler = bcsync.NewServiceToggler()
|
||||
require.NotNil(t, s)
|
||||
assert.Equal(t, false, s.chainStarted.IsSet())
|
||||
assert.Equal(t, false, s.synced.IsSet())
|
||||
assert.Equal(t, true, s.Syncing())
|
||||
assert.NoError(t, s.Status())
|
||||
require.NoError(t, s.beginSync())
|
||||
s.chainStarted.Set()
|
||||
assert.ErrorContains(t, "syncing", s.Status())
|
||||
|
||||
go func() {
|
||||
s.markSynced()
|
||||
s.completeSync()
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -398,12 +402,14 @@ func TestService_Resync(t *testing.T) {
|
||||
mc = tt.chainService()
|
||||
}
|
||||
s := NewService(ctx, &Config{
|
||||
DB: beaconDB,
|
||||
P2P: p,
|
||||
Chain: mc,
|
||||
StateNotifier: mc.StateNotifier(),
|
||||
BlobStorage: filesystem.NewEphemeralBlobStorage(t),
|
||||
DB: beaconDB,
|
||||
P2P: p,
|
||||
Chain: mc,
|
||||
StateNotifier: mc.StateNotifier(),
|
||||
BlobStorage: filesystem.NewEphemeralBlobStorage(t),
|
||||
InitialSyncComplete: make(chan struct{}),
|
||||
})
|
||||
s.toggler = bcsync.NewServiceToggler()
|
||||
assert.NotNil(t, s)
|
||||
s.genesisTime = mc.Genesis
|
||||
assert.Equal(t, primitives.Slot(0), s.cfg.Chain.HeadSlot())
|
||||
@@ -793,7 +799,7 @@ func TestFetchOriginColumns(t *testing.T) {
|
||||
// Create a block with blob commitments and sidecars
|
||||
roBlock, _, verifiedRoSidecars := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
|
||||
|
||||
ctxMap, err := prysmSync.ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot)
|
||||
ctxMap, err := bcsync.ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
service := &Service{
|
||||
@@ -816,7 +822,7 @@ func TestFetchOriginColumns(t *testing.T) {
|
||||
assert.DeepEqual(t, expectedRequests[attempt], actualRequest)
|
||||
|
||||
for _, column := range toRespondByAttempt[attempt] {
|
||||
err = prysmSync.WriteDataColumnSidecarChunk(stream, clock, other.Encoding(), verifiedRoSidecars[column].DataColumnSidecar)
|
||||
err = bcsync.WriteDataColumnSidecarChunk(stream, clock, other.Encoding(), verifiedRoSidecars[column].DataColumnSidecar)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
|
||||
143
beacon-chain/sync/toggle.go
Normal file
143
beacon-chain/sync/toggle.go
Normal file
@@ -0,0 +1,143 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// ToggleGroup represents a service that can be toggled.
|
||||
type ToggleGroup uint32
|
||||
|
||||
const (
|
||||
NilService ToggleGroup = iota
|
||||
// ToggleGroupRangeSync represents the initial-sync service.
|
||||
ToggleGroupRangeSync
|
||||
// ToggleGroupBackfill represents the backfill service.
|
||||
ToggleGroupBackfill
|
||||
)
|
||||
|
||||
// String returns a human-readable representation of the ToggleGroup.
|
||||
func (t ToggleGroup) String() string {
|
||||
switch t {
|
||||
case NilService:
|
||||
return "none active"
|
||||
case ToggleGroupRangeSync:
|
||||
return "range sync"
|
||||
case ToggleGroupBackfill:
|
||||
return "backfill"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
type blockedToggleRoutine struct {
|
||||
id ToggleGroup
|
||||
ch chan struct{}
|
||||
queued time.Time
|
||||
}
|
||||
|
||||
// ServiceToggler provides a mechanism similar to a WaitGroup, enabling multiple goroutines running
|
||||
// in a pre-defined ToggleGroup to collectively acquire an exclusive "lock", which is released once
|
||||
// all goroutines in that ToggleGroup have called Release.
|
||||
// See documentation on the Acquire() and Release() methods for details.
|
||||
type ServiceToggler struct {
|
||||
blocked []*blockedToggleRoutine
|
||||
current ToggleGroup
|
||||
mu sync.Mutex
|
||||
active int
|
||||
}
|
||||
|
||||
// NewServiceToggler initialize a ServiceToggler.
|
||||
func NewServiceToggler() *ServiceToggler {
|
||||
return &ServiceToggler{
|
||||
blocked: make([]*blockedToggleRoutine, 0),
|
||||
current: NilService,
|
||||
active: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire blocks until the calling service can have exclusive permission to run.
|
||||
// If the calling service is already the current service and no service is waiting,
|
||||
// Acquire will return immediately. If another service is waitig, it will not be allowed
|
||||
// to acquire the "lock" until the other service has a chance to acquire and release outstanding
|
||||
// lock requets.
|
||||
// If the calling service is not the current service, it will block until all active
|
||||
// threads of the current service have called Release.
|
||||
func (t *ServiceToggler) Acquire(ctx context.Context, id ToggleGroup) error {
|
||||
t.mu.Lock()
|
||||
// This means we are initializing for the first time.
|
||||
// This is different from the normal toggle case because there won't be a call
|
||||
// to release to unblock the thread.
|
||||
if t.current == NilService {
|
||||
t.current = id
|
||||
t.active += 1
|
||||
t.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
// Fast path: if we are already the current service and there is no queue, just proceed.
|
||||
// We never want to starve the other service, so as soon as we have a queue, start toggling
|
||||
// between the different services.
|
||||
if t.current == id && len(t.blocked) == 0 {
|
||||
t.active += 1
|
||||
t.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
// ch will be closed when Release is called by the last active thread of the current service.
|
||||
// until then the call to Acquire will block at the <-ch line below.
|
||||
ch := make(chan struct{})
|
||||
t.blocked = append(t.blocked, &blockedToggleRoutine{
|
||||
id: id,
|
||||
ch: ch,
|
||||
queued: time.Now(),
|
||||
})
|
||||
log.WithField("service", id.String()).WithField("blocked", len(t.blocked)).Debug("Waiting on sync toggle")
|
||||
t.mu.Unlock()
|
||||
select {
|
||||
case <-ch:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Release decrements the active thread counter and manages unblocking threads for the next service in line
|
||||
// if there is a queue.
|
||||
func (t *ServiceToggler) Release(id ToggleGroup) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if t.current != id {
|
||||
// This is an impossible condition but ignoring it seems safer than breaking other invariants.
|
||||
return
|
||||
}
|
||||
t.active -= 1
|
||||
|
||||
// If there are blocked threads, and this is the last active thread,
|
||||
// release the next blocked thread before completing Release.
|
||||
// We only want the last releaser to manage the blocked queue.
|
||||
if t.active > 0 || len(t.blocked) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
next := t.blocked[0]
|
||||
t.current = next.id
|
||||
t.active += 1
|
||||
close(next.ch) // release the blocked thread
|
||||
for _, next := range t.blocked[1:] {
|
||||
if next.id != t.current {
|
||||
break
|
||||
}
|
||||
t.active += 1
|
||||
close(next.ch)
|
||||
}
|
||||
t.blocked = t.blocked[t.active:]
|
||||
log.WithFields(logrus.Fields{
|
||||
"releasedBy": id.String(),
|
||||
"service": next.id.String(),
|
||||
"unblocked": t.active,
|
||||
"waited": time.Since(next.queued),
|
||||
}).Debug("Unblocked goroutines from sync toggle")
|
||||
// We hold the lock so t.active = number of elements removed from t.blocked
|
||||
}
|
||||
63
beacon-chain/sync/toggle_test.go
Normal file
63
beacon-chain/sync/toggle_test.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
)
|
||||
|
||||
func TestNewToggle(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
toggler := NewServiceToggler()
|
||||
require.Equal(t, NilService, toggler.current)
|
||||
require.Equal(t, 0, len(toggler.blocked))
|
||||
|
||||
start := time.Now()
|
||||
require.NoError(t, toggler.Acquire(ctx, ToggleGroupRangeSync))
|
||||
require.Equal(t, true, time.Since(start) < time.Millisecond)
|
||||
ordered := make([]int, 0)
|
||||
go func() {
|
||||
<-time.After(2 * time.Millisecond)
|
||||
ordered = append(ordered, 1)
|
||||
toggler.Release(ToggleGroupRangeSync)
|
||||
}()
|
||||
require.NoError(t, toggler.Acquire(ctx, ToggleGroupBackfill))
|
||||
ordered = append(ordered, 2)
|
||||
// This assertion ensures that the Acquire call above blocked until the Release call in the goroutine.
|
||||
require.DeepEqual(t, []int{1, 2}, ordered)
|
||||
}
|
||||
|
||||
func TestToggleSequence(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
toggler := NewServiceToggler()
|
||||
require.NoError(t, toggler.Acquire(ctx, ToggleGroupRangeSync))
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
wg.Done()
|
||||
require.NoError(t, toggler.Acquire(ctx, ToggleGroupBackfill))
|
||||
}()
|
||||
go func() {
|
||||
wg.Done()
|
||||
require.NoError(t, toggler.Acquire(ctx, ToggleGroupBackfill))
|
||||
}()
|
||||
wg.Wait()
|
||||
<-time.After(1 * time.Millisecond)
|
||||
require.Equal(t, ToggleGroupRangeSync, toggler.current)
|
||||
require.Equal(t, 1, toggler.active)
|
||||
require.Equal(t, 2, len(toggler.blocked))
|
||||
toggler.Release(ToggleGroupRangeSync)
|
||||
require.Equal(t, ToggleGroupBackfill, toggler.current)
|
||||
require.Equal(t, 2, toggler.active)
|
||||
require.Equal(t, 0, len(toggler.blocked))
|
||||
toggler.Release(ToggleGroupBackfill)
|
||||
require.Equal(t, ToggleGroupBackfill, toggler.current)
|
||||
require.Equal(t, 1, toggler.active)
|
||||
require.Equal(t, 0, len(toggler.blocked))
|
||||
toggler.Release(ToggleGroupBackfill)
|
||||
require.Equal(t, ToggleGroupBackfill, toggler.current)
|
||||
require.Equal(t, 0, toggler.active)
|
||||
require.Equal(t, 0, len(toggler.blocked))
|
||||
}
|
||||
2
changelog/kasey_backfill-initsync-lock.md
Normal file
2
changelog/kasey_backfill-initsync-lock.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Changed
|
||||
- Backfill and range syncing cooperatively share an exclusive "lock" over RPC access. Range syncing will hold the lock for an entire round robin sync cycle, while backfill maintains more coarse grained locks on individual units of work, in order to prioritize initial-sync.
|
||||
Reference in New Issue
Block a user