From 320c4764cee3d7a089ea3d7d7f271b8f8c67634f Mon Sep 17 00:00:00 2001 From: Matilda-Clerke Date: Thu, 19 Dec 2024 10:39:59 +1100 Subject: [PATCH] Replace waitforpeertask with ethpeers method (#8009) * 7582: Add waitForPeer method to PeerSelector and EthPeers Signed-off-by: Matilda Clerke * 7582: Replace all usages of WaitForPeer[s]Task with new EthPeers.waitForPeer method Signed-off-by: Matilda Clerke * 7582: Fix PivotBlockConfirmerTest Signed-off-by: Matilda Clerke * 7582: spotless Signed-off-by: Matilda Clerke * 7582: Fix broken PivotBlockRetrieverTest Signed-off-by: Matilda Clerke * 7582: Fix broken FastSyncActionsTest Signed-off-by: Matilda Clerke * 7582: spotless Signed-off-by: Matilda Clerke * 7582: Fix issues after merge Signed-off-by: Matilda Clerke * 7582: Put AbstractSyncTargetManager.waitForPeerAndThenSetSyncTarget code back separate thread to avoid infinite loop waiting for peers during acceptance tests Signed-off-by: Matilda Clerke * 7582: Remove pivot block checks when waiting for peer in FastSyncActions Signed-off-by: Matilda Clerke * 7582: Remove estimated chain height check from PivotBlockConfirmer when waiting for peers Signed-off-by: Matilda Clerke * 7582: Fix broken PivotBlockRetrieverTest Signed-off-by: Matilda Clerke * Use isSuitablePeer as peer selection criteria when waiting for a peer in AbstractRetryingPeerTask Signed-off-by: Matilda Clerke * Remove MetricsSystem from PivotSelectorFromPeers Signed-off-by: Matilda Clerke --------- Signed-off-by: Matilda Clerke --- .../controller/BesuControllerBuilder.java | 3 +- .../qbft/BFTPivotSelectorFromPeers.java | 5 +- .../qbft/sync/QbftPivotSelectorTest.java | 34 +---- .../besu/ethereum/eth/manager/EthPeers.java | 34 +++++ .../eth/manager/peertask/PeerSelector.java | 9 ++ .../task/AbstractRetryingPeerTask.java | 8 +- .../eth/manager/task/WaitForPeerTask.java | 64 --------- .../eth/manager/task/WaitForPeersTask.java | 83 ------------ .../eth/sync/AbstractSyncTargetManager.java | 14 +- .../backwardsync/BackwardSyncAlgorithm.java | 14 +- .../eth/sync/fastsync/FastSyncActions.java | 14 +- .../sync/fastsync/PivotBlockConfirmer.java | 16 ++- .../sync/fastsync/PivotSelectorFromPeers.java | 21 +-- .../fastsync/PivotSelectorFromSafeBlock.java | 12 +- .../eth/manager/task/WaitForPeerTaskTest.java | 87 ------------ .../manager/task/WaitForPeersTaskTest.java | 105 --------------- .../sync/fastsync/FastSyncActionsTest.java | 127 ++++-------------- .../fastsync/PivotBlockRetrieverTest.java | 12 +- 18 files changed, 122 insertions(+), 540 deletions(-) delete mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeerTask.java delete mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeersTask.java delete mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeerTaskTest.java delete mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeersTaskTest.java diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index 9498f7b2d..d10ab0436 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -923,7 +923,6 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides ethContext, syncConfig, syncState, - metricsSystem, protocolContext, nodeKey, blockchain.getChainHeadHeader()); @@ -953,7 +952,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides unsubscribeForkchoiceListener); } else { LOG.info("TTD difficulty is not present, creating initial sync phase for PoW"); - return new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem); + return new PivotSelectorFromPeers(ethContext, syncConfig, syncState); } } diff --git a/consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/BFTPivotSelectorFromPeers.java b/consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/BFTPivotSelectorFromPeers.java index 7437b349a..78ad18166 100644 --- a/consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/BFTPivotSelectorFromPeers.java +++ b/consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/BFTPivotSelectorFromPeers.java @@ -26,7 +26,6 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; import org.hyperledger.besu.ethereum.eth.sync.fastsync.NoSyncRequiredException; import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromPeers; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; -import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,7 +55,6 @@ public class BFTPivotSelectorFromPeers extends PivotSelectorFromPeers { * @param ethContext the eth context * @param syncConfig the sync config * @param syncState the sync state - * @param metricsSystem the metrics * @param protocolContext the protocol context * @param nodeKey the node key * @param blockHeader the block header @@ -65,11 +63,10 @@ public class BFTPivotSelectorFromPeers extends PivotSelectorFromPeers { final EthContext ethContext, final SynchronizerConfiguration syncConfig, final SyncState syncState, - final MetricsSystem metricsSystem, final ProtocolContext protocolContext, final NodeKey nodeKey, final BlockHeader blockHeader) { - super(ethContext, syncConfig, syncState, metricsSystem); + super(ethContext, syncConfig, syncState); this.protocolContext = protocolContext; this.blockHeader = blockHeader; this.nodeKey = nodeKey; diff --git a/consensus/qbft/src/test/java/org/hyperledger/besu/consensus/qbft/sync/QbftPivotSelectorTest.java b/consensus/qbft/src/test/java/org/hyperledger/besu/consensus/qbft/sync/QbftPivotSelectorTest.java index 2f3013ea7..76f41dc65 100644 --- a/consensus/qbft/src/test/java/org/hyperledger/besu/consensus/qbft/sync/QbftPivotSelectorTest.java +++ b/consensus/qbft/src/test/java/org/hyperledger/besu/consensus/qbft/sync/QbftPivotSelectorTest.java @@ -33,7 +33,6 @@ import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; import org.hyperledger.besu.ethereum.eth.sync.fastsync.NoSyncRequiredException; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; -import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.ArrayList; import java.util.List; @@ -55,7 +54,6 @@ public class QbftPivotSelectorTest { @Mock private ProtocolContext protocolContext; @Mock private BftContext bftContext; @Mock private SyncState syncState; - @Mock private MetricsSystem metricsSystem; @Mock private EthContext ethContext; @Mock private EthPeers ethPeers; @Mock private ValidatorProvider validatorProvider; @@ -80,13 +78,7 @@ public class QbftPivotSelectorTest { when(validatorProvider.getValidatorsAtHead()).thenReturn(validatorList); BFTPivotSelectorFromPeers pivotSelector = new BFTPivotSelectorFromPeers( - ethContext, - syncConfig, - syncState, - metricsSystem, - protocolContext, - nodeKey, - blockHeader); + ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader); Optional pivotState = pivotSelector.selectNewPivotBlock(); assertThat(pivotState.isEmpty()).isTrue(); } @@ -104,13 +96,7 @@ public class QbftPivotSelectorTest { when(validatorProvider.getValidatorsAtHead()).thenReturn(validatorList); BFTPivotSelectorFromPeers pivotSelector = new BFTPivotSelectorFromPeers( - ethContext, - syncConfig, - syncState, - metricsSystem, - protocolContext, - nodeKey, - blockHeader); + ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader); try { Optional pivotState = pivotSelector.selectNewPivotBlock(); @@ -126,13 +112,7 @@ public class QbftPivotSelectorTest { when(validatorProvider.nodeIsValidator(any())).thenReturn(false); BFTPivotSelectorFromPeers pivotSelector = new BFTPivotSelectorFromPeers( - ethContext, - syncConfig, - syncState, - metricsSystem, - protocolContext, - nodeKey, - blockHeader); + ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader); Optional pivotState = pivotSelector.selectNewPivotBlock(); assertThat(pivotState.isEmpty()).isTrue(); @@ -145,13 +125,7 @@ public class QbftPivotSelectorTest { when(blockHeader.getNumber()).thenReturn(10L); BFTPivotSelectorFromPeers pivotSelector = new BFTPivotSelectorFromPeers( - ethContext, - syncConfig, - syncState, - metricsSystem, - protocolContext, - nodeKey, - blockHeader); + ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader); Optional pivotState = pivotSelector.selectNewPivotBlock(); assertThat(pivotState.isEmpty()).isTrue(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index fda8bb0f1..c91ed9646 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -474,6 +474,40 @@ public class EthPeers implements PeerSelector { .min(LEAST_TO_MOST_BUSY); } + // Part of the PeerSelector interface, to be split apart later + @Override + public CompletableFuture waitForPeer(final Predicate filter) { + final CompletableFuture future = new CompletableFuture<>(); + LOG.debug("Waiting for peer matching filter. {} peers currently connected.", peerCount()); + // check for an existing peer matching the filter and use that if one is found + Optional maybePeer = getPeer(filter); + if (maybePeer.isPresent()) { + LOG.debug("Found peer matching filter already connected!"); + future.complete(maybePeer.get()); + } else { + // no existing peer matches our filter. Subscribe to new connections until we find one + LOG.debug("Subscribing to new peer connections to wait until one matches filter"); + final long subscriptionId = + subscribeConnect( + (peer) -> { + if (!future.isDone() && filter.test(peer)) { + LOG.debug("Found new peer matching filter!"); + future.complete(peer); + } else { + LOG.debug("New peer does not match filter"); + } + }); + future.handle( + (peer, throwable) -> { + LOG.debug("Unsubscribing from new peer connections with ID {}", subscriptionId); + unsubscribeConnect(subscriptionId); + return null; + }); + } + + return future; + } + // Part of the PeerSelector interface, to be split apart later @Override public Optional getPeerByPeerId(final PeerId peerId) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java index 8f7ab33e4..73f01e71c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java @@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.p2p.peers.PeerId; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; /** Selects the EthPeers for the PeerTaskExecutor */ @@ -31,6 +32,14 @@ public interface PeerSelector { */ Optional getPeer(final Predicate filter); + /** + * Waits for a peer matching the supplied filter + * + * @param filter a Predicate\ matching desirable peers + * @return a CompletableFuture into which a peer will be placed + */ + CompletableFuture waitForPeer(final Predicate filter); + /** * Attempts to get the EthPeer identified by peerId * diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java index e7f1556b5..dd75a6eff 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java @@ -26,6 +26,7 @@ import org.hyperledger.besu.util.ExceptionUtils; import java.time.Duration; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; @@ -129,13 +130,12 @@ public abstract class AbstractRetryingPeerTask extends AbstractEthTask { "No useful peer found, wait max 5 seconds for new peer to connect: current peers {}", ethContext.getEthPeers().peerCount()); - final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext, metricsSystem); executeSubTask( () -> ethContext - .getScheduler() - // wait for a new peer for up to 5 seconds - .timeout(waitTask, Duration.ofSeconds(5)) + .getEthPeers() + .waitForPeer(this::isSuitablePeer) + .orTimeout(5, TimeUnit.SECONDS) // execute the task again .whenComplete((r, t) -> executeTaskTimed())); return; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeerTask.java deleted file mode 100644 index 44ced57e7..000000000 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeerTask.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.manager.task; - -import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.eth.manager.EthPeers; -import org.hyperledger.besu.plugin.services.MetricsSystem; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Wait for a single new peer to connect. */ -public class WaitForPeerTask extends AbstractEthTask { - private static final Logger LOG = LoggerFactory.getLogger(WaitForPeerTask.class); - - private final EthContext ethContext; - private volatile Long peerListenerId; - - private WaitForPeerTask(final EthContext ethContext, final MetricsSystem metricsSystem) { - super(metricsSystem); - this.ethContext = ethContext; - } - - public static WaitForPeerTask create( - final EthContext ethContext, final MetricsSystem metricsSystem) { - return new WaitForPeerTask(ethContext, metricsSystem); - } - - @Override - protected void executeTask() { - final EthPeers ethPeers = ethContext.getEthPeers(); - LOG.debug( - "Waiting for new peer connection. {} peers currently connected.", ethPeers.peerCount()); - // Listen for peer connections and complete task when we hit our target - peerListenerId = - ethPeers.subscribeConnect( - (peer) -> { - LOG.debug("Finished waiting for peer connection."); - // We hit our target - result.complete(null); - }); - } - - @Override - protected void cleanup() { - super.cleanup(); - final Long listenerId = peerListenerId; - if (listenerId != null) { - ethContext.getEthPeers().unsubscribeConnect(listenerId); - } - } -} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeersTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeersTask.java deleted file mode 100644 index b0d407702..000000000 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeersTask.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.manager.task; - -import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.eth.manager.EthPeers; -import org.hyperledger.besu.plugin.services.MetricsSystem; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Waits for some number of peers to connect. */ -public class WaitForPeersTask extends AbstractEthTask { - private static final Logger LOG = LoggerFactory.getLogger(WaitForPeersTask.class); - - private final int targetPeerCount; - private final EthContext ethContext; - private volatile Long peerListenerId; - - private WaitForPeersTask( - final EthContext ethContext, final int targetPeerCount, final MetricsSystem metricsSystem) { - super(metricsSystem); - this.targetPeerCount = targetPeerCount; - this.ethContext = ethContext; - } - - public static WaitForPeersTask create( - final EthContext ethContext, final int targetPeerCount, final MetricsSystem metricsSystem) { - return new WaitForPeersTask(ethContext, targetPeerCount, metricsSystem); - } - - @Override - protected void executeTask() { - final EthPeers ethPeers = ethContext.getEthPeers(); - if (ethPeers.peerCount() >= targetPeerCount) { - LOG.debug("We already hit our target of at least {} peers connected", targetPeerCount); - result.complete(null); - return; - } - - LOG.info( - "Waiting for {} total peers to connect. {} peers currently connected.", - targetPeerCount, - ethPeers.peerCount()); - // Listen for peer connections and complete task when we hit our target - peerListenerId = - ethPeers.subscribeConnect( - (peer) -> { - final int peerCount = ethPeers.peerCount(); - if (peerCount >= targetPeerCount) { - LOG.debug("Complete: {} peers connected.", targetPeerCount); - // We hit our target - result.complete(null); - } else { - LOG.debug( - "Waiting for {} total peers to connect. {} peers currently connected.", - targetPeerCount, - peerCount); - } - }); - } - - @Override - protected void cleanup() { - super.cleanup(); - final Long listenerId = peerListenerId; - if (listenerId != null) { - ethContext.getEthPeers().unsubscribeConnect(peerListenerId); - } - } -} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/AbstractSyncTargetManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/AbstractSyncTargetManager.java index a60892f15..46d9aed20 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/AbstractSyncTargetManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/AbstractSyncTargetManager.java @@ -19,7 +19,6 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeerTask; import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget; import org.hyperledger.besu.ethereum.eth.sync.tasks.DetermineCommonAncestorTask; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; @@ -117,13 +116,16 @@ public abstract class AbstractSyncTargetManager { protected abstract CompletableFuture> selectBestAvailableSyncTarget(); private CompletableFuture waitForPeerAndThenSetSyncTarget() { - return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> findSyncTarget()); - } - - private CompletableFuture waitForNewPeer() { return ethContext .getScheduler() - .timeout(WaitForPeerTask.create(ethContext, metricsSystem), Duration.ofSeconds(5)); + .scheduleFutureTask( + () -> + ethContext + .getEthPeers() + .waitForPeer((peer) -> true) + .handle((ignored, ignored2) -> null) + .thenCompose((r) -> findSyncTarget()), + Duration.ofSeconds(5)); } private boolean isCancelled() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgorithm.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgorithm.java index 506b36073..a7311ac46 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgorithm.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgorithm.java @@ -21,7 +21,6 @@ import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException; -import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask; import org.hyperledger.besu.plugin.services.BesuEvents; import java.util.Optional; @@ -189,7 +188,12 @@ public class BackwardSyncAlgorithm implements BesuEvents.InitialSyncCompletionLi final boolean await = latch.get().await(2, TimeUnit.MINUTES); if (await) { LOG.debug("Preconditions meet, ensure at least one peer is connected"); - waitForPeers(1).get(); + context + .getEthContext() + .getEthPeers() + .waitForPeer((peer) -> true) + .orTimeout(5, TimeUnit.SECONDS) + .get(); } } } catch (InterruptedException e) { @@ -211,12 +215,6 @@ public class BackwardSyncAlgorithm implements BesuEvents.InitialSyncCompletionLi } } - private CompletableFuture waitForPeers(final int count) { - final WaitForPeersTask waitForPeersTask = - WaitForPeersTask.create(context.getEthContext(), count, context.getMetricsSystem()); - return waitForPeersTask.run(); - } - @Override public void onInitialSyncCompleted() { countDownIfReady(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java index 0ce6d2f2f..2eb6b4603 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java @@ -24,7 +24,6 @@ import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersExce import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTask; -import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -131,11 +130,13 @@ public class FastSyncActions { private CompletableFuture internalDownloadPivotBlockHeader( final FastSyncState currentState) { if (currentState.hasPivotBlockHeader()) { - LOG.info("Initial sync state {} already contains the block header", currentState); + LOG.debug("Initial sync state {} already contains the block header", currentState); return completedFuture(currentState); } - return waitForPeers(1) + return ethContext + .getEthPeers() + .waitForPeer((peer) -> true) .thenCompose( unused -> currentState @@ -245,11 +246,4 @@ public class FastSyncActions { public boolean isBlockchainBehind(final long blockNumber) { return protocolContext.getBlockchain().getChainHeadHeader().getNumber() < blockNumber; } - - private CompletableFuture waitForPeers(final int count) { - - final WaitForPeersTask waitForPeersTask = - WaitForPeersTask.create(ethContext, count, metricsSystem); - return waitForPeersTask.run(); - } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockConfirmer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockConfirmer.java index e8a07c920..9b3b40f8a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockConfirmer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockConfirmer.java @@ -21,7 +21,6 @@ import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorRespon import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; -import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeerTask; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByNumberTask; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; @@ -175,7 +174,6 @@ class PivotBlockConfirmer { // Stop loop if this task is done return CompletableFuture.failedFuture(new CancellationException()); } - final Optional query = createPivotQuery(blockNumber); final CompletableFuture pivotHeaderFuture; if (query.isPresent()) { @@ -188,9 +186,17 @@ class PivotBlockConfirmer { pivotHeaderFuture = ethContext .getScheduler() - .timeout(WaitForPeerTask.create(ethContext, metricsSystem), Duration.ofSeconds(5)) - .handle((err, res) -> null) // Ignore result - .thenCompose(res -> executePivotQuery(blockNumber)); + .scheduleFutureTask( + () -> + ethContext + .getEthPeers() + .waitForPeer( + (peer) -> !pivotBlockQueriesByPeerId.containsKey(peer.nodeId())) + // Ignore result, ensure even a timeout will result in calling + // executePivotQuery + .handle((r, e) -> null) + .thenCompose(res -> executePivotQuery(blockNumber)), + Duration.ofSeconds(5)); } return pivotHeaderFuture; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromPeers.java index 16f7e09d6..9816dc66d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromPeers.java @@ -17,13 +17,11 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerLimiter; import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; -import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -38,17 +36,14 @@ public class PivotSelectorFromPeers implements PivotBlockSelector { protected final EthContext ethContext; protected final SynchronizerConfiguration syncConfig; private final SyncState syncState; - private final MetricsSystem metricsSystem; public PivotSelectorFromPeers( final EthContext ethContext, final SynchronizerConfiguration syncConfig, - final SyncState syncState, - final MetricsSystem metricsSystem) { + final SyncState syncState) { this.ethContext = ethContext; this.syncConfig = syncConfig; this.syncState = syncState; - this.metricsSystem = metricsSystem; } @Override @@ -58,15 +53,19 @@ public class PivotSelectorFromPeers implements PivotBlockSelector { @Override public CompletableFuture prepareRetry() { + final long estimatedPivotBlock = conservativelyEstimatedPivotBlock(); final TrailingPeerLimiter trailingPeerLimiter = new TrailingPeerLimiter( ethContext.getEthPeers(), () -> new TrailingPeerRequirements( - conservativelyEstimatedPivotBlock(), syncConfig.getMaxTrailingPeers())); + estimatedPivotBlock, syncConfig.getMaxTrailingPeers())); trailingPeerLimiter.enforceTrailingPeerLimit(); - return waitForPeers(syncConfig.getSyncMinimumPeerCount()); + return ethContext + .getEthPeers() + .waitForPeer((peer) -> peer.chainState().getEstimatedHeight() >= estimatedPivotBlock) + .thenRun(() -> {}); } @Override @@ -129,10 +128,4 @@ public class PivotSelectorFromPeers implements PivotBlockSelector { syncState.getLocalChainHeight() + syncConfig.getSyncPivotDistance(); return Math.min(syncState.bestChainHeight(), estimatedNextPivot); } - - private CompletableFuture waitForPeers(final int count) { - final WaitForPeersTask waitForPeersTask = - WaitForPeersTask.create(ethContext, count, metricsSystem); - return waitForPeersTask.run(); - } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromSafeBlock.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromSafeBlock.java index bfbba248c..f30fb54e2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromSafeBlock.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromSafeBlock.java @@ -23,7 +23,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTask; -import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByHashTask; @@ -129,7 +128,9 @@ public class PivotSelectorFromSafeBlock implements PivotBlockSelector { LOG.debug( "Downloading chain head block header by hash {}", headBlockHash); try { - return waitForPeers(1) + return ethContext + .getEthPeers() + .waitForPeer((peer) -> true) .thenCompose(unused -> downloadBlockHeader(headBlockHash)) .thenApply( blockHeader -> { @@ -193,11 +194,4 @@ public class PivotSelectorFromSafeBlock implements PivotBlockSelector { } }); } - - private CompletableFuture waitForPeers(final int count) { - - final WaitForPeersTask waitForPeersTask = - WaitForPeersTask.create(ethContext, count, metricsSystem); - return waitForPeersTask.run(); - } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeerTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeerTaskTest.java deleted file mode 100644 index 07bce17b6..000000000 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeerTaskTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.manager.task; - -import static org.assertj.core.api.Assertions.assertThat; - -import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; -import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestBuilder; -import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; -import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; -import org.hyperledger.besu.plugin.services.MetricsSystem; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -public class WaitForPeerTaskTest { - private EthProtocolManager ethProtocolManager; - private EthContext ethContext; - private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); - - @BeforeEach - public void setupTest() { - ethProtocolManager = EthProtocolManagerTestBuilder.builder().build(); - ethContext = ethProtocolManager.ethContext(); - } - - @Test - public void completesWhenPeerConnects() throws ExecutionException, InterruptedException { - // Execute task and wait for response - final AtomicBoolean successful = new AtomicBoolean(false); - final EthTask task = WaitForPeerTask.create(ethContext, metricsSystem); - final CompletableFuture future = task.run(); - future.whenComplete( - (result, error) -> { - if (error == null) { - successful.compareAndSet(false, true); - } - }); - EthProtocolManagerTestUtil.createPeer(ethProtocolManager); - assertThat(successful).isTrue(); - } - - @Test - public void doesNotCompleteWhenNoPeerConnects() throws ExecutionException, InterruptedException { - final AtomicBoolean successful = new AtomicBoolean(false); - final EthTask task = WaitForPeerTask.create(ethContext, metricsSystem); - final CompletableFuture future = task.run(); - future.whenComplete( - (result, error) -> { - if (error == null) { - successful.compareAndSet(false, true); - } - }); - - assertThat(successful).isFalse(); - } - - @Test - public void cancel() throws ExecutionException, InterruptedException { - // Execute task - final EthTask task = WaitForPeerTask.create(ethContext, metricsSystem); - final CompletableFuture future = task.run(); - - assertThat(future.isDone()).isFalse(); - task.cancel(); - assertThat(future.isDone()).isTrue(); - assertThat(future.isCancelled()).isTrue(); - assertThat(task.run().isCancelled()).isTrue(); - } -} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeersTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeersTaskTest.java deleted file mode 100644 index c4bf49513..000000000 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeersTaskTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.manager.task; - -import static org.assertj.core.api.Assertions.assertThat; - -import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; -import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestBuilder; -import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; -import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; -import org.hyperledger.besu.plugin.services.MetricsSystem; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -public class WaitForPeersTaskTest { - private EthProtocolManager ethProtocolManager; - private EthContext ethContext; - private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); - - @BeforeEach - public void setupTest() { - ethProtocolManager = EthProtocolManagerTestBuilder.builder().build(); - ethContext = ethProtocolManager.ethContext(); - } - - @Test - public void completesWhenPeersConnects() throws ExecutionException, InterruptedException { - // Execute task and wait for response - final AtomicBoolean successful = new AtomicBoolean(false); - final EthTask task = WaitForPeersTask.create(ethContext, 2, metricsSystem); - final CompletableFuture future = task.run(); - future.whenComplete( - (result, error) -> { - if (error == null) { - successful.compareAndSet(false, true); - } - }); - EthProtocolManagerTestUtil.createPeer(ethProtocolManager); - EthProtocolManagerTestUtil.createPeer(ethProtocolManager); - assertThat(successful).isTrue(); - } - - @Test - public void doesNotCompleteWhenNoPeerConnects() throws ExecutionException, InterruptedException { - final AtomicBoolean successful = new AtomicBoolean(false); - final EthTask task = WaitForPeersTask.create(ethContext, 2, metricsSystem); - final CompletableFuture future = task.run(); - future.whenComplete( - (result, error) -> { - if (error == null) { - successful.compareAndSet(false, true); - } - }); - - assertThat(successful).isFalse(); - } - - @Test - public void doesNotCompleteWhenSomePeersConnects() - throws ExecutionException, InterruptedException { - final AtomicBoolean successful = new AtomicBoolean(false); - final EthTask task = WaitForPeersTask.create(ethContext, 2, metricsSystem); - final CompletableFuture future = task.run(); - future.whenComplete( - (result, error) -> { - if (error == null) { - successful.compareAndSet(false, true); - } - }); - EthProtocolManagerTestUtil.createPeer(ethProtocolManager); - - assertThat(successful).isFalse(); - } - - @Test - public void cancel() throws ExecutionException, InterruptedException { - // Execute task - final EthTask task = WaitForPeersTask.create(ethContext, 2, metricsSystem); - final CompletableFuture future = task.run(); - - assertThat(future.isDone()).isFalse(); - task.cancel(); - assertThat(future.isDone()).isTrue(); - assertThat(future.isCancelled()).isTrue(); - assertThat(task.run().isCancelled()).isTrue(); - } -} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java index ffce23ee2..fe9683059 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java @@ -16,6 +16,8 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.hyperledger.besu.config.GenesisConfigOptions; @@ -29,7 +31,6 @@ import org.hyperledger.besu.ethereum.core.Difficulty; import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestBuilder; @@ -119,8 +120,7 @@ public class FastSyncActionsTest { metricsSystem = new NoOpMetricsSystem(); fastSyncActions = createFastSyncActions( - syncConfig, - new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem)); + syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState)); } @ParameterizedTest @@ -170,8 +170,7 @@ public class FastSyncActionsTest { fastSyncActions = createFastSyncActions( - syncConfig, - new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem)); + syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState)); EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000); @@ -188,8 +187,7 @@ public class FastSyncActionsTest { setUp(storageFormat, false, Optional.of(1)); fastSyncActions = createFastSyncActions( - syncConfig, - new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem)); + syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState)); EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(1000), 5500); EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(2000), 4000); @@ -208,8 +206,7 @@ public class FastSyncActionsTest { EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); fastSyncActions = createFastSyncActions( - syncConfig, - new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem)); + syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState)); final CompletableFuture result = fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE); @@ -231,100 +228,27 @@ public class FastSyncActionsTest { @ParameterizedTest @ArgumentsSource(FastSyncActionsTest.FastSyncActionsTestArguments.class) - public void selectPivotBlockShouldWaitAndRetryIfSufficientChainHeightEstimatesAreUnavailable( + public void selectPivotBlockShouldRetryIfPivotBlockSelectorReturnsEmptyOptional( final DataStorageFormat storageFormat) { - int minPeers = 3; - setUp(storageFormat, false, Optional.of(minPeers)); - fastSyncActions = - createFastSyncActions( - syncConfig, - new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem)); - final long minPivotHeight = syncConfig.getSyncPivotDistance() + 1L; - EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); + setUp(storageFormat, false, Optional.of(3)); - // Create peers without chain height estimates - List peers = new ArrayList<>(); - for (int i = 0; i < minPeers; i++) { - final Difficulty td = Difficulty.of(i); - final OptionalLong height = OptionalLong.empty(); - final RespondingEthPeer peer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, td, height); - peers.add(peer); - } + PivotBlockSelector pivotBlockSelector = mock(PivotBlockSelector.class); + fastSyncActions = createFastSyncActions(syncConfig, pivotBlockSelector); - // No pivot should be selected while peers do not have height estimates - final CompletableFuture result = + FastSyncState expectedResult = new FastSyncState(123); + + when(pivotBlockSelector.selectNewPivotBlock()) + .thenReturn(Optional.empty()) + .thenReturn(Optional.of(expectedResult)); + when(pivotBlockSelector.prepareRetry()).thenReturn(CompletableFuture.runAsync(() -> {})); + + CompletableFuture resultFuture = fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE); - assertThat(result).isNotDone(); - EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); - assertThat(result).isNotDone(); - // Set subset of heights - peers - .subList(0, minPeers - 1) - .forEach(p -> p.getEthPeer().chainState().updateHeightEstimate(minPivotHeight + 10)); + verify(pivotBlockSelector, times(2)).selectNewPivotBlock(); + verify(pivotBlockSelector).prepareRetry(); - // No pivot should be selected while only a subset of peers have height estimates - EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); - assertThat(result).isNotDone(); - - // Set final height - final long bestPeerHeight = minPivotHeight + 1; - peers.get(minPeers - 1).getEthPeer().chainState().updateHeightEstimate(bestPeerHeight); - final FastSyncState expected = - new FastSyncState(bestPeerHeight - syncConfig.getSyncPivotDistance()); - EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); - assertThat(result).isCompletedWithValue(expected); - } - - @ParameterizedTest - @ArgumentsSource(FastSyncActionsTest.FastSyncActionsTestArguments.class) - public void selectPivotBlockShouldWaitAndRetryIfSufficientValidatedPeersUnavailable( - final DataStorageFormat storageFormat) { - final int minPeers = 3; - setUp(storageFormat, false, Optional.of(minPeers)); - final PeerValidator validator = mock(PeerValidator.class); - fastSyncActions = - createFastSyncActions( - syncConfig, - new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem)); - final long minPivotHeight = syncConfig.getSyncPivotDistance() + 1L; - EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); - - // Create peers that are not validated - final OptionalLong height = OptionalLong.of(minPivotHeight + 10); - List peers = new ArrayList<>(); - for (int i = 0; i < minPeers; i++) { - final Difficulty td = Difficulty.of(i); - - final RespondingEthPeer peer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, td, height, validator); - peers.add(peer); - } - - // No pivot should be selected while peers are not fully validated - final CompletableFuture result = - fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE); - assertThat(result).isNotDone(); - EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); - assertThat(result).isNotDone(); - - // Validate a subset of peers - peers.subList(0, minPeers - 1).forEach(p -> p.getEthPeer().markValidated(validator)); - - // No pivot should be selected while only a subset of peers has height estimates - EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); - assertThat(result).isNotDone(); - - // Set best height and mark best peer validated - final long bestPeerHeight = minPivotHeight + 11; - final EthPeer bestPeer = peers.get(minPeers - 1).getEthPeer(); - bestPeer.chainState().updateHeightEstimate(bestPeerHeight); - bestPeer.markValidated(validator); - final FastSyncState expected = - new FastSyncState(bestPeerHeight - syncConfig.getSyncPivotDistance()); - EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); - assertThat(result).isCompletedWithValue(expected); + assertThat(resultFuture).isCompletedWithValue(expectedResult); } @ParameterizedTest @@ -355,8 +279,7 @@ public class FastSyncActionsTest { final int peerCount = 4; fastSyncActions = createFastSyncActions( - syncConfig, - new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem)); + syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState)); final long minPivotHeight = syncConfig.getSyncPivotDistance() + 1L; EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); @@ -404,8 +327,7 @@ public class FastSyncActionsTest { setUp(storageFormat, false, Optional.of(1)); fastSyncActions = createFastSyncActions( - syncConfig, - new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem)); + syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState)); final long pivotDistance = syncConfig.getSyncPivotDistance(); EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); @@ -466,8 +388,7 @@ public class FastSyncActionsTest { setUp(storageFormat, false, Optional.of(1)); fastSyncActions = createFastSyncActions( - syncConfig, - new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem)); + syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState)); final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1001); final CompletableFuture result = diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java index 2bfc84876..9cd23a54b 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java @@ -164,6 +164,7 @@ public class PivotBlockRetrieverTest { // Add new peer that we can query final RespondingEthPeer respondingPeerB = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); respondingPeerB.respond(responder); // We need one more responsive peer before we're done @@ -174,6 +175,7 @@ public class PivotBlockRetrieverTest { // Add new peer that we can query final RespondingEthPeer respondingPeerC = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); respondingPeerC.respond(responder); assertThat(badPeerA.hasOutstandingRequests()).isFalse(); assertThat(badPeerB.hasOutstandingRequests()).isFalse(); @@ -215,11 +217,9 @@ public class PivotBlockRetrieverTest { final RespondingEthPeer respondingPeerB = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator); respondingPeerB.getEthPeer().markValidated(peerValidator); - // When our new peer "connects", it is not yet valid, so we need to expire our retry timeout - // so that the peer will get re-processed - EthProtocolManagerTestUtil.expirePendingTimeouts(ethProtocolManager); + // add another peer to ensure we get past the waitForPeer call + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator); - assertThat(respondingPeerB.hasOutstandingRequests()).isTrue(); respondingPeerB.respond(responder); // We need one more responsive peer before we're done @@ -231,9 +231,9 @@ public class PivotBlockRetrieverTest { final RespondingEthPeer respondingPeerC = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator); respondingPeerC.getEthPeer().markValidated(peerValidator); - EthProtocolManagerTestUtil.expirePendingTimeouts(ethProtocolManager); + // add another peer to ensure we get past the waitForPeer call + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator); - assertThat(respondingPeerC.hasOutstandingRequests()).isTrue(); respondingPeerC.respond(responder); assertThat(badPeerA.hasOutstandingRequests()).isFalse(); assertThat(badPeerB.hasOutstandingRequests()).isFalse();