From 4f9e56fc706e27c27734cf6bff92de97b99cf09a Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 6 Oct 2025 12:55:48 +0200 Subject: [PATCH] Custody Info: Waits for initialization (#15804) * Revert "`createLocalNode`: Wait before retrying to retrieve the custody group count if not present. (#15735)" This reverts commit 4585cdc9322ea545c6ff32954126b504b1a08759. * Revert "Fix no custody info available at start (#15732)" This reverts commit 80eba4e6dd7ff4de33ae006cd859bfff7947a5c5. * Add context to `EarliestAvailableSlot` and `CustodyGroupCount` (no functional change). * Remove double imports. * `EarliestAvailableSlot` and `CustodyGroupCount`: Wait for custody info to be initialized. --- beacon-chain/blockchain/process_block.go | 2 +- beacon-chain/blockchain/setup_test.go | 4 +- beacon-chain/p2p/custody.go | 60 +++++++++++----- beacon-chain/p2p/custody_test.go | 68 +++++++------------ beacon-chain/p2p/discovery.go | 28 ++------ beacon-chain/p2p/discovery_test.go | 8 +++ beacon-chain/p2p/interfaces.go | 4 +- beacon-chain/p2p/service.go | 2 + beacon-chain/p2p/testing/fuzz_p2p.go | 4 +- beacon-chain/p2p/testing/p2p.go | 4 +- beacon-chain/sync/custody.go | 7 +- beacon-chain/sync/custody_test.go | 19 +++--- beacon-chain/sync/data_column_sidecars.go | 5 +- .../sync/data_column_sidecars_test.go | 3 +- .../sync/initial-sync/blocks_fetcher.go | 2 +- beacon-chain/sync/initial-sync/service.go | 2 +- .../sync/initial-sync/service_test.go | 2 +- .../sync/rpc_beacon_blocks_by_root.go | 2 +- beacon-chain/sync/rpc_status.go | 2 +- beacon-chain/sync/subscriber.go | 2 +- beacon-chain/sync/subscriber_beacon_blocks.go | 2 +- changelog/manu-fix-cgc-not-initialized.md | 2 - changelog/manu-wait-init-custody-info.md | 2 + changelog/manu-wait.md | 2 - 24 files changed, 121 insertions(+), 117 deletions(-) delete mode 100644 changelog/manu-fix-cgc-not-initialized.md create mode 100644 changelog/manu-wait-init-custody-info.md delete mode 100644 changelog/manu-wait.md diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index d4da1c621f..26d5508869 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -712,7 +712,7 @@ func (s *Service) areDataColumnsAvailable( nodeID := s.cfg.P2P.NodeID() // Get the custody group sampling size for the node. - custodyGroupCount, err := s.cfg.P2P.CustodyGroupCount() + custodyGroupCount, err := s.cfg.P2P.CustodyGroupCount(ctx) if err != nil { return errors.Wrap(err, "custody group count") } diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index 875ff8aab4..b18640f327 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -106,14 +106,14 @@ type mockCustodyManager struct { custodyGroupCount uint64 } -func (dch *mockCustodyManager) EarliestAvailableSlot() (primitives.Slot, error) { +func (dch *mockCustodyManager) EarliestAvailableSlot(context.Context) (primitives.Slot, error) { dch.mut.RLock() defer dch.mut.RUnlock() return dch.earliestAvailableSlot, nil } -func (dch *mockCustodyManager) CustodyGroupCount() (uint64, error) { +func (dch *mockCustodyManager) CustodyGroupCount(context.Context) (uint64, error) { dch.mut.RLock() defer dch.mut.RUnlock() diff --git a/beacon-chain/p2p/custody.go b/beacon-chain/p2p/custody.go index ec382266c7..0ca21da26d 100644 --- a/beacon-chain/p2p/custody.go +++ b/beacon-chain/p2p/custody.go @@ -1,6 +1,8 @@ package p2p import ( + "context" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" @@ -10,32 +12,28 @@ import ( "github.com/sirupsen/logrus" ) -var errNoCustodyInfo = errors.New("no custody info available") - var _ CustodyManager = (*Service)(nil) // EarliestAvailableSlot returns the earliest available slot. -func (s *Service) EarliestAvailableSlot() (primitives.Slot, error) { - s.custodyInfoLock.RLock() - defer s.custodyInfoLock.RUnlock() - - if s.custodyInfo == nil { - return 0, errors.New("no custody info available") +// It blocks until the custody info is set or the context is done. +func (s *Service) EarliestAvailableSlot(ctx context.Context) (primitives.Slot, error) { + custodyInfo, err := s.waitForCustodyInfo(ctx) + if err != nil { + return 0, errors.Wrap(err, "wait for custody info") } - return s.custodyInfo.earliestAvailableSlot, nil + return custodyInfo.earliestAvailableSlot, nil } // CustodyGroupCount returns the custody group count. -func (s *Service) CustodyGroupCount() (uint64, error) { - s.custodyInfoLock.Lock() - defer s.custodyInfoLock.Unlock() - - if s.custodyInfo == nil { - return 0, errNoCustodyInfo +// It blocks until the custody info is set or the context is done. +func (s *Service) CustodyGroupCount(ctx context.Context) (uint64, error) { + custodyInfo, err := s.waitForCustodyInfo(ctx) + if err != nil { + return 0, errors.Wrap(err, "wait for custody info") } - return s.custodyInfo.groupCount, nil + return custodyInfo.groupCount, nil } // UpdateCustodyInfo updates the stored custody group count to the incoming one @@ -79,6 +77,9 @@ func (s *Service) UpdateCustodyInfo(earliestAvailableSlot primitives.Slot, custo earliestAvailableSlot: earliestAvailableSlot, groupCount: custodyGroupCount, } + + close(s.custodyInfoSet) + return earliestAvailableSlot, custodyGroupCount, nil } @@ -147,6 +148,33 @@ func (s *Service) CustodyGroupCountFromPeer(pid peer.ID) uint64 { return custodyCount } +func (s *Service) waitForCustodyInfo(ctx context.Context) (custodyInfo, error) { + select { + case <-s.custodyInfoSet: + info, ok := s.copyCustodyInfo() + if !ok { + return custodyInfo{}, errors.New("custody info was set but is nil") + } + + return info, nil + case <-ctx.Done(): + return custodyInfo{}, ctx.Err() + } +} + +// copyCustodyInfo returns a copy of the current custody info in a thread-safe manner. +// If no custody info is set, it returns false as the second return value. +func (s *Service) copyCustodyInfo() (custodyInfo, bool) { + s.custodyInfoLock.RLock() + defer s.custodyInfoLock.RUnlock() + + if s.custodyInfo == nil { + return custodyInfo{}, false + } + + return *s.custodyInfo, true +} + // custodyGroupCountFromPeerENR retrieves the custody count from the peer's ENR. // If the ENR is not available, it defaults to the minimum number of custody groups // an honest node custodies and serves samples from. diff --git a/beacon-chain/p2p/custody_test.go b/beacon-chain/p2p/custody_test.go index 8771a95c45..0a875e1127 100644 --- a/beacon-chain/p2p/custody_test.go +++ b/beacon-chain/p2p/custody_test.go @@ -20,58 +20,37 @@ import ( ) func TestEarliestAvailableSlot(t *testing.T) { - t.Run("No custody info available", func(t *testing.T) { - service := &Service{ - custodyInfo: nil, - } + const expected primitives.Slot = 100 - _, err := service.EarliestAvailableSlot() + service := &Service{ + custodyInfoSet: make(chan struct{}), + custodyInfo: &custodyInfo{ + earliestAvailableSlot: expected, + }, + } - require.NotNil(t, err) - }) + close(service.custodyInfoSet) + slot, err := service.EarliestAvailableSlot(t.Context()) - t.Run("Valid custody info", func(t *testing.T) { - const expected primitives.Slot = 100 - - service := &Service{ - custodyInfo: &custodyInfo{ - earliestAvailableSlot: expected, - }, - } - - slot, err := service.EarliestAvailableSlot() - - require.NoError(t, err) - require.Equal(t, expected, slot) - }) + require.NoError(t, err) + require.Equal(t, expected, slot) } func TestCustodyGroupCount(t *testing.T) { - t.Run("No custody info available", func(t *testing.T) { - service := &Service{ - custodyInfo: nil, - } + const expected uint64 = 5 - _, err := service.CustodyGroupCount() + service := &Service{ + custodyInfoSet: make(chan struct{}), + custodyInfo: &custodyInfo{ + groupCount: expected, + }, + } - require.NotNil(t, err) - require.Equal(t, true, strings.Contains(err.Error(), "no custody info available")) - }) + close(service.custodyInfoSet) + count, err := service.CustodyGroupCount(t.Context()) - t.Run("Valid custody info", func(t *testing.T) { - const expected uint64 = 5 - - service := &Service{ - custodyInfo: &custodyInfo{ - groupCount: expected, - }, - } - - count, err := service.CustodyGroupCount() - - require.NoError(t, err) - require.Equal(t, expected, count) - }) + require.NoError(t, err) + require.Equal(t, expected, count) } func TestUpdateCustodyInfo(t *testing.T) { @@ -163,7 +142,8 @@ func TestUpdateCustodyInfo(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { service := &Service{ - custodyInfo: tc.initialCustodyInfo, + custodyInfoSet: make(chan struct{}), + custodyInfo: tc.initialCustodyInfo, } slot, groupCount, err := service.UpdateCustodyInfo(tc.inputSlot, tc.inputGroupCount) diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index a58d51ac40..a8b0f59d6b 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -253,7 +253,7 @@ func (s *Service) RefreshPersistentSubnets() { return } - custodyGroupCount, err = s.CustodyGroupCount() + custodyGroupCount, err = s.CustodyGroupCount(s.ctx) if err != nil { log.WithError(err).Error("Could not retrieve custody group count") return @@ -604,27 +604,13 @@ func (s *Service) createLocalNode( localNode = initializeSyncCommSubnets(localNode) if params.FuluEnabled() { - // TODO: Replace this quick fix with a proper synchronization scheme (chan?) - const delay = 1 * time.Second - - var custodyGroupCount uint64 - - err := errNoCustodyInfo - for errors.Is(err, errNoCustodyInfo) { - custodyGroupCount, err = s.CustodyGroupCount() - if errors.Is(err, errNoCustodyInfo) { - log.WithField("delay", delay).Debug("No custody info available yet, retrying later") - time.Sleep(delay) - continue - } - - if err != nil { - return nil, errors.Wrap(err, "retrieve custody group count") - } - - custodyGroupCountEntry := peerdas.Cgc(custodyGroupCount) - localNode.Set(custodyGroupCountEntry) + custodyGroupCount, err := s.CustodyGroupCount(s.ctx) + if err != nil { + return nil, errors.Wrap(err, "could not retrieve custody group count") } + + custodyGroupCountEntry := peerdas.Cgc(custodyGroupCount) + localNode.Set(custodyGroupCountEntry) } if s.cfg != nil && s.cfg.HostAddress != "" { diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index 5bb38c350b..b5cb188b4f 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -281,9 +281,13 @@ func TestCreateLocalNode(t *testing.T) { genesisTime: time.Now(), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), cfg: tt.cfg, + ctx: t.Context(), custodyInfo: &custodyInfo{groupCount: custodyRequirement}, + custodyInfoSet: make(chan struct{}), } + close(service.custodyInfoSet) + localNode, err := service.createLocalNode(privKey, address, udpPort, tcpPort, quicPort) if tt.expectedError { require.NotNil(t, err) @@ -912,9 +916,13 @@ func TestRefreshPersistentSubnets(t *testing.T) { peers: p2p.Peers(), genesisTime: time.Now().Add(-time.Duration(tc.epochSinceGenesis*secondsPerEpoch) * time.Second), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), + ctx: t.Context(), + custodyInfoSet: make(chan struct{}), custodyInfo: &custodyInfo{groupCount: custodyGroupCount}, } + close(service.custodyInfoSet) + // Set the listener and the metadata. createListener := func() (*discover.UDPv5, error) { return service.createListener(nil, privateKey) diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index c45dba4b0f..f648bb53cc 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -123,8 +123,8 @@ type ( // CustodyManager abstracts some data columns related methods. CustodyManager interface { - EarliestAvailableSlot() (primitives.Slot, error) - CustodyGroupCount() (uint64, error) + EarliestAvailableSlot(ctx context.Context) (primitives.Slot, error) + CustodyGroupCount(ctx context.Context) (uint64, error) UpdateCustodyInfo(earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error) CustodyGroupCountFromPeer(peer.ID) uint64 } diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index eeff1a85c3..d003ba0176 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -91,6 +91,7 @@ type Service struct { peerDisconnectionTime *cache.Cache custodyInfo *custodyInfo custodyInfoLock sync.RWMutex // Lock access to custodyInfo + custodyInfoSet chan struct{} allForkDigests map[[4]byte]struct{} } @@ -137,6 +138,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { joinedTopics: make(map[string]*pubsub.Topic, len(gossipTopicMappings)), subnetsLock: make(map[uint64]*sync.RWMutex), peerDisconnectionTime: cache.New(1*time.Second, 1*time.Minute), + custodyInfoSet: make(chan struct{}), } ipAddr := prysmnetwork.IPAddr() diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index d7a4d51b3a..3bfea3c5ce 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -199,12 +199,12 @@ func (*FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Disc } // EarliestAvailableSlot -- fake. -func (*FakeP2P) EarliestAvailableSlot() (primitives.Slot, error) { +func (*FakeP2P) EarliestAvailableSlot(context.Context) (primitives.Slot, error) { return 0, nil } // CustodyGroupCount -- fake. -func (*FakeP2P) CustodyGroupCount() (uint64, error) { +func (*FakeP2P) CustodyGroupCount(context.Context) (uint64, error) { return 0, nil } diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index ca12b720bb..b4beaf7633 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -473,7 +473,7 @@ func (*TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Disc } // EarliestAvailableSlot . -func (s *TestP2P) EarliestAvailableSlot() (primitives.Slot, error) { +func (s *TestP2P) EarliestAvailableSlot(context.Context) (primitives.Slot, error) { s.custodyInfoMut.RLock() defer s.custodyInfoMut.RUnlock() @@ -481,7 +481,7 @@ func (s *TestP2P) EarliestAvailableSlot() (primitives.Slot, error) { } // CustodyGroupCount . -func (s *TestP2P) CustodyGroupCount() (uint64, error) { +func (s *TestP2P) CustodyGroupCount(context.Context) (uint64, error) { s.custodyInfoMut.RLock() defer s.custodyInfoMut.RUnlock() diff --git a/beacon-chain/sync/custody.go b/beacon-chain/sync/custody.go index fac1edd892..0e9bc3507f 100644 --- a/beacon-chain/sync/custody.go +++ b/beacon-chain/sync/custody.go @@ -1,6 +1,7 @@ package sync import ( + "context" "strings" "time" @@ -29,13 +30,13 @@ func (s *Service) updateCustodyInfoIfNeeded() error { const minimumPeerCount = 1 // Get our actual custody group count. - actualCustodyGrounpCount, err := s.cfg.p2p.CustodyGroupCount() + actualCustodyGrounpCount, err := s.cfg.p2p.CustodyGroupCount(s.ctx) if err != nil { return errors.Wrap(err, "p2p custody group count") } // Get our target custody group count. - targetCustodyGroupCount, err := s.custodyGroupCount() + targetCustodyGroupCount, err := s.custodyGroupCount(s.ctx) if err != nil { return errors.Wrap(err, "custody group count") } @@ -88,7 +89,7 @@ func (s *Service) updateCustodyInfoIfNeeded() error { // custodyGroupCount computes the custody group count based on the custody requirement, // the validators custody requirement, and whether the node is subscribed to all data subnets. -func (s *Service) custodyGroupCount() (uint64, error) { +func (s *Service) custodyGroupCount(context.Context) (uint64, error) { beaconConfig := params.BeaconConfig() if flags.Get().SubscribeAllDataSubnets { diff --git a/beacon-chain/sync/custody_test.go b/beacon-chain/sync/custody_test.go index 00a110bc5a..4690d58bbc 100644 --- a/beacon-chain/sync/custody_test.go +++ b/beacon-chain/sync/custody_test.go @@ -16,7 +16,6 @@ import ( "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" - eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/testing/require" ) @@ -55,9 +54,9 @@ func setupCustodyTest(t *testing.T, withChain bool) *testSetup { if withChain { const headSlot = primitives.Slot(100) - block, err := blocks.NewSignedBeaconBlock(ð.SignedBeaconBlock{ - Block: ð.BeaconBlock{ - Body: ð.BeaconBlockBody{}, + block, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlock{ + Block: ðpb.BeaconBlock{ + Body: ðpb.BeaconBlockBody{}, Slot: headSlot, }, }) @@ -90,11 +89,13 @@ func setupCustodyTest(t *testing.T, withChain bool) *testSetup { } func (ts *testSetup) assertCustodyInfo(t *testing.T, expectedSlot primitives.Slot, expectedCount uint64) { - p2pEarliestSlot, err := ts.p2pService.EarliestAvailableSlot() + ctx := t.Context() + + p2pEarliestSlot, err := ts.p2pService.EarliestAvailableSlot(ctx) require.NoError(t, err) require.Equal(t, expectedSlot, p2pEarliestSlot) - p2pCustodyCount, err := ts.p2pService.CustodyGroupCount() + p2pCustodyCount, err := ts.p2pService.CustodyGroupCount(ctx) require.NoError(t, err) require.Equal(t, expectedCount, p2pCustodyCount) @@ -170,13 +171,15 @@ func TestCustodyGroupCount(t *testing.T) { config.CustodyRequirement = 3 params.OverrideBeaconConfig(config) + ctx := t.Context() + t.Run("SubscribeAllDataSubnets enabled returns NumberOfCustodyGroups", func(t *testing.T) { withSubscribeAllDataSubnets(t, func() { service := &Service{ ctx: context.Background(), } - result, err := service.custodyGroupCount() + result, err := service.custodyGroupCount(ctx) require.NoError(t, err) require.Equal(t, config.NumberOfCustodyGroups, result) }) @@ -188,7 +191,7 @@ func TestCustodyGroupCount(t *testing.T) { trackedValidatorsCache: cache.NewTrackedValidatorsCache(), } - result, err := service.custodyGroupCount() + result, err := service.custodyGroupCount(ctx) require.NoError(t, err) require.Equal(t, config.CustodyRequirement, result) }) diff --git a/beacon-chain/sync/data_column_sidecars.go b/beacon-chain/sync/data_column_sidecars.go index 69cca2ca4b..e4d25217b1 100644 --- a/beacon-chain/sync/data_column_sidecars.go +++ b/beacon-chain/sync/data_column_sidecars.go @@ -20,7 +20,6 @@ import ( "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket" "github.com/OffchainLabs/prysm/v6/crypto/rand" - eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" goPeer "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" @@ -921,7 +920,7 @@ func buildByRangeRequests( func buildByRootRequest(indicesByRoot map[[fieldparams.RootLength]byte]map[uint64]bool) p2ptypes.DataColumnsByRootIdentifiers { identifiers := make(p2ptypes.DataColumnsByRootIdentifiers, 0, len(indicesByRoot)) for root, indices := range indicesByRoot { - identifier := ð.DataColumnsByRootIdentifier{ + identifier := ðpb.DataColumnsByRootIdentifier{ BlockRoot: root[:], Columns: helpers.SortedSliceFromMap(indices), } @@ -929,7 +928,7 @@ func buildByRootRequest(indicesByRoot map[[fieldparams.RootLength]byte]map[uint6 } // Sort identifiers to have a deterministic output. - slices.SortFunc(identifiers, func(left, right *eth.DataColumnsByRootIdentifier) int { + slices.SortFunc(identifiers, func(left, right *ethpb.DataColumnsByRootIdentifier) int { if cmp := bytes.Compare(left.BlockRoot, right.BlockRoot); cmp != 0 { return cmp } diff --git a/beacon-chain/sync/data_column_sidecars_test.go b/beacon-chain/sync/data_column_sidecars_test.go index 43434c0eed..c7e886d348 100644 --- a/beacon-chain/sync/data_column_sidecars_test.go +++ b/beacon-chain/sync/data_column_sidecars_test.go @@ -23,7 +23,6 @@ import ( "github.com/OffchainLabs/prysm/v6/consensus-types/wrapper" leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" - pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/testing/assert" "github.com/OffchainLabs/prysm/v6/testing/require" "github.com/OffchainLabs/prysm/v6/testing/util" @@ -144,7 +143,7 @@ func TestFetchDataColumnSidecars(t *testing.T) { HeadSlot: 8, }) - p2p.Peers().SetMetadata(other.PeerID(), wrapper.WrappedMetadataV2(&pb.MetaDataV2{ + p2p.Peers().SetMetadata(other.PeerID(), wrapper.WrappedMetadataV2(ðpb.MetaDataV2{ CustodyGroupCount: 128, })) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 4b891a70f5..aa6802fcc4 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -384,7 +384,7 @@ func (f *blocksFetcher) fetchSidecars(ctx context.Context, pid peer.ID, peers [] } // Compute the columns to request. - custodyGroupCount, err := f.p2p.CustodyGroupCount() + custodyGroupCount, err := f.p2p.CustodyGroupCount(ctx) if err != nil { return blobsPid, errors.Wrap(err, "custody group count") } diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index eb05deebca..7ca5da18ab 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -413,7 +413,7 @@ func (s *Service) fetchOriginDataColumnSidecars(roBlock blocks.ROBlock, delay ti } // Compute the indices we need to custody. - custodyGroupCount, err := s.cfg.P2P.CustodyGroupCount() + custodyGroupCount, err := s.cfg.P2P.CustodyGroupCount(s.ctx) if err != nil { return errors.Wrap(err, "custody group count") } diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go index f6a7eb5dce..6a88ddf153 100644 --- a/beacon-chain/sync/initial-sync/service_test.go +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -629,7 +629,7 @@ func TestFetchOriginSidecars(t *testing.T) { // Compute the columns to request. p2p := p2ptest.NewTestP2P(t) - custodyGroupCount, err := p2p.CustodyGroupCount() + custodyGroupCount, err := p2p.CustodyGroupCount(t.Context()) require.NoError(t, err) samplingSize := max(custodyGroupCount, samplesPerSlot) diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index af0fec09a8..fb0c904594 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -99,7 +99,7 @@ func (s *Service) requestAndSaveMissingDataColumnSidecars(blks []blocks.ROBlock) samplesPerSlot := params.BeaconConfig().SamplesPerSlot - custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount() + custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount(s.ctx) if err != nil { return errors.Wrap(err, "custody group count") } diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index 41a8eb9c61..3fbc0ba0a0 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -354,7 +354,7 @@ func (s *Service) buildStatusFromStream( } if streamVersion == p2p.SchemaVersionV2 { - earliestAvailableSlot, err := s.cfg.p2p.EarliestAvailableSlot() + earliestAvailableSlot, err := s.cfg.p2p.EarliestAvailableSlot(s.ctx) if err != nil { return nil, errors.Wrap(err, "earliest available slot") } diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 6518fd8d0a..4d32c10b65 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -680,7 +680,7 @@ func (s *Service) samplingSize() (uint64, error) { return 0, errors.Wrap(err, "validators custody requirement") } - custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount() + custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount(s.ctx) if err != nil { return 0, errors.Wrap(err, "custody group count") } diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index 4196dd991d..8278f67ed7 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -299,7 +299,7 @@ func (s *Service) columnIndicesToSample() (map[uint64]bool, error) { nodeID := s.cfg.p2p.NodeID() // Get the custody group sampling size for the node. - custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount() + custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount(s.ctx) if err != nil { return nil, errors.Wrap(err, "custody group count") } diff --git a/changelog/manu-fix-cgc-not-initialized.md b/changelog/manu-fix-cgc-not-initialized.md deleted file mode 100644 index 80d40a9da9..0000000000 --- a/changelog/manu-fix-cgc-not-initialized.md +++ /dev/null @@ -1,2 +0,0 @@ -### Fixed -- In P2P service start, wait for the custody info to be correctly initialized. \ No newline at end of file diff --git a/changelog/manu-wait-init-custody-info.md b/changelog/manu-wait-init-custody-info.md new file mode 100644 index 0000000000..28650d4d3e --- /dev/null +++ b/changelog/manu-wait-init-custody-info.md @@ -0,0 +1,2 @@ +### Fixed +- Wait for custody info to be initialized before querying them. \ No newline at end of file diff --git a/changelog/manu-wait.md b/changelog/manu-wait.md deleted file mode 100644 index cc3ea015ac..0000000000 --- a/changelog/manu-wait.md +++ /dev/null @@ -1,2 +0,0 @@ -### Fixed -- `createLocalNode`: Wait before retrying to retrieve the custody group count if not present.