Replace waitforpeertask with ethpeers method (#8009)

* 7582: Add waitForPeer method to PeerSelector and EthPeers

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7582: Replace all usages of WaitForPeer[s]Task with new EthPeers.waitForPeer method

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7582: Fix PivotBlockConfirmerTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7582: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7582: Fix broken PivotBlockRetrieverTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7582: Fix broken FastSyncActionsTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7582: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7582: Fix issues after merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7582: Put AbstractSyncTargetManager.waitForPeerAndThenSetSyncTarget code back separate thread to avoid infinite loop waiting for peers during acceptance tests

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7582: Remove pivot block checks when waiting for peer in FastSyncActions

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7582: Remove estimated chain height check from PivotBlockConfirmer when waiting for peers

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7582: Fix broken PivotBlockRetrieverTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* Use isSuitablePeer as peer selection criteria when waiting for a peer in AbstractRetryingPeerTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* Remove MetricsSystem from PivotSelectorFromPeers

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

---------

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
This commit is contained in:
Matilda-Clerke
2024-12-19 10:39:59 +11:00
committed by GitHub
parent 3b4136dac5
commit 320c4764ce
18 changed files with 122 additions and 540 deletions

View File

@@ -923,7 +923,6 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
ethContext,
syncConfig,
syncState,
metricsSystem,
protocolContext,
nodeKey,
blockchain.getChainHeadHeader());
@@ -953,7 +952,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
unsubscribeForkchoiceListener);
} else {
LOG.info("TTD difficulty is not present, creating initial sync phase for PoW");
return new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem);
return new PivotSelectorFromPeers(ethContext, syncConfig, syncState);
}
}

View File

@@ -26,7 +26,6 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.NoSyncRequiredException;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromPeers;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,7 +55,6 @@ public class BFTPivotSelectorFromPeers extends PivotSelectorFromPeers {
* @param ethContext the eth context
* @param syncConfig the sync config
* @param syncState the sync state
* @param metricsSystem the metrics
* @param protocolContext the protocol context
* @param nodeKey the node key
* @param blockHeader the block header
@@ -65,11 +63,10 @@ public class BFTPivotSelectorFromPeers extends PivotSelectorFromPeers {
final EthContext ethContext,
final SynchronizerConfiguration syncConfig,
final SyncState syncState,
final MetricsSystem metricsSystem,
final ProtocolContext protocolContext,
final NodeKey nodeKey,
final BlockHeader blockHeader) {
super(ethContext, syncConfig, syncState, metricsSystem);
super(ethContext, syncConfig, syncState);
this.protocolContext = protocolContext;
this.blockHeader = blockHeader;
this.nodeKey = nodeKey;

View File

@@ -33,7 +33,6 @@ import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.NoSyncRequiredException;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.ArrayList;
import java.util.List;
@@ -55,7 +54,6 @@ public class QbftPivotSelectorTest {
@Mock private ProtocolContext protocolContext;
@Mock private BftContext bftContext;
@Mock private SyncState syncState;
@Mock private MetricsSystem metricsSystem;
@Mock private EthContext ethContext;
@Mock private EthPeers ethPeers;
@Mock private ValidatorProvider validatorProvider;
@@ -80,13 +78,7 @@ public class QbftPivotSelectorTest {
when(validatorProvider.getValidatorsAtHead()).thenReturn(validatorList);
BFTPivotSelectorFromPeers pivotSelector =
new BFTPivotSelectorFromPeers(
ethContext,
syncConfig,
syncState,
metricsSystem,
protocolContext,
nodeKey,
blockHeader);
ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader);
Optional<FastSyncState> pivotState = pivotSelector.selectNewPivotBlock();
assertThat(pivotState.isEmpty()).isTrue();
}
@@ -104,13 +96,7 @@ public class QbftPivotSelectorTest {
when(validatorProvider.getValidatorsAtHead()).thenReturn(validatorList);
BFTPivotSelectorFromPeers pivotSelector =
new BFTPivotSelectorFromPeers(
ethContext,
syncConfig,
syncState,
metricsSystem,
protocolContext,
nodeKey,
blockHeader);
ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader);
try {
Optional<FastSyncState> pivotState = pivotSelector.selectNewPivotBlock();
@@ -126,13 +112,7 @@ public class QbftPivotSelectorTest {
when(validatorProvider.nodeIsValidator(any())).thenReturn(false);
BFTPivotSelectorFromPeers pivotSelector =
new BFTPivotSelectorFromPeers(
ethContext,
syncConfig,
syncState,
metricsSystem,
protocolContext,
nodeKey,
blockHeader);
ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader);
Optional<FastSyncState> pivotState = pivotSelector.selectNewPivotBlock();
assertThat(pivotState.isEmpty()).isTrue();
@@ -145,13 +125,7 @@ public class QbftPivotSelectorTest {
when(blockHeader.getNumber()).thenReturn(10L);
BFTPivotSelectorFromPeers pivotSelector =
new BFTPivotSelectorFromPeers(
ethContext,
syncConfig,
syncState,
metricsSystem,
protocolContext,
nodeKey,
blockHeader);
ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader);
Optional<FastSyncState> pivotState = pivotSelector.selectNewPivotBlock();
assertThat(pivotState.isEmpty()).isTrue();

View File

@@ -474,6 +474,40 @@ public class EthPeers implements PeerSelector {
.min(LEAST_TO_MOST_BUSY);
}
// Part of the PeerSelector interface, to be split apart later
@Override
public CompletableFuture<EthPeer> waitForPeer(final Predicate<EthPeer> filter) {
final CompletableFuture<EthPeer> future = new CompletableFuture<>();
LOG.debug("Waiting for peer matching filter. {} peers currently connected.", peerCount());
// check for an existing peer matching the filter and use that if one is found
Optional<EthPeer> maybePeer = getPeer(filter);
if (maybePeer.isPresent()) {
LOG.debug("Found peer matching filter already connected!");
future.complete(maybePeer.get());
} else {
// no existing peer matches our filter. Subscribe to new connections until we find one
LOG.debug("Subscribing to new peer connections to wait until one matches filter");
final long subscriptionId =
subscribeConnect(
(peer) -> {
if (!future.isDone() && filter.test(peer)) {
LOG.debug("Found new peer matching filter!");
future.complete(peer);
} else {
LOG.debug("New peer does not match filter");
}
});
future.handle(
(peer, throwable) -> {
LOG.debug("Unsubscribing from new peer connections with ID {}", subscriptionId);
unsubscribeConnect(subscriptionId);
return null;
});
}
return future;
}
// Part of the PeerSelector interface, to be split apart later
@Override
public Optional<EthPeer> getPeerByPeerId(final PeerId peerId) {

View File

@@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
/** Selects the EthPeers for the PeerTaskExecutor */
@@ -31,6 +32,14 @@ public interface PeerSelector {
*/
Optional<EthPeer> getPeer(final Predicate<EthPeer> filter);
/**
* Waits for a peer matching the supplied filter
*
* @param filter a Predicate\<EthPeer\> matching desirable peers
* @return a CompletableFuture into which a peer will be placed
*/
CompletableFuture<EthPeer> waitForPeer(final Predicate<EthPeer> filter);
/**
* Attempts to get the EthPeer identified by peerId
*

View File

@@ -26,6 +26,7 @@ import org.hyperledger.besu.util.ExceptionUtils;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
@@ -129,13 +130,12 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
"No useful peer found, wait max 5 seconds for new peer to connect: current peers {}",
ethContext.getEthPeers().peerCount());
final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext, metricsSystem);
executeSubTask(
() ->
ethContext
.getScheduler()
// wait for a new peer for up to 5 seconds
.timeout(waitTask, Duration.ofSeconds(5))
.getEthPeers()
.waitForPeer(this::isSuitablePeer)
.orTimeout(5, TimeUnit.SECONDS)
// execute the task again
.whenComplete((r, t) -> executeTaskTimed()));
return;

View File

@@ -1,64 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Wait for a single new peer to connect. */
public class WaitForPeerTask extends AbstractEthTask<Void> {
private static final Logger LOG = LoggerFactory.getLogger(WaitForPeerTask.class);
private final EthContext ethContext;
private volatile Long peerListenerId;
private WaitForPeerTask(final EthContext ethContext, final MetricsSystem metricsSystem) {
super(metricsSystem);
this.ethContext = ethContext;
}
public static WaitForPeerTask create(
final EthContext ethContext, final MetricsSystem metricsSystem) {
return new WaitForPeerTask(ethContext, metricsSystem);
}
@Override
protected void executeTask() {
final EthPeers ethPeers = ethContext.getEthPeers();
LOG.debug(
"Waiting for new peer connection. {} peers currently connected.", ethPeers.peerCount());
// Listen for peer connections and complete task when we hit our target
peerListenerId =
ethPeers.subscribeConnect(
(peer) -> {
LOG.debug("Finished waiting for peer connection.");
// We hit our target
result.complete(null);
});
}
@Override
protected void cleanup() {
super.cleanup();
final Long listenerId = peerListenerId;
if (listenerId != null) {
ethContext.getEthPeers().unsubscribeConnect(listenerId);
}
}
}

View File

@@ -1,83 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Waits for some number of peers to connect. */
public class WaitForPeersTask extends AbstractEthTask<Void> {
private static final Logger LOG = LoggerFactory.getLogger(WaitForPeersTask.class);
private final int targetPeerCount;
private final EthContext ethContext;
private volatile Long peerListenerId;
private WaitForPeersTask(
final EthContext ethContext, final int targetPeerCount, final MetricsSystem metricsSystem) {
super(metricsSystem);
this.targetPeerCount = targetPeerCount;
this.ethContext = ethContext;
}
public static WaitForPeersTask create(
final EthContext ethContext, final int targetPeerCount, final MetricsSystem metricsSystem) {
return new WaitForPeersTask(ethContext, targetPeerCount, metricsSystem);
}
@Override
protected void executeTask() {
final EthPeers ethPeers = ethContext.getEthPeers();
if (ethPeers.peerCount() >= targetPeerCount) {
LOG.debug("We already hit our target of at least {} peers connected", targetPeerCount);
result.complete(null);
return;
}
LOG.info(
"Waiting for {} total peers to connect. {} peers currently connected.",
targetPeerCount,
ethPeers.peerCount());
// Listen for peer connections and complete task when we hit our target
peerListenerId =
ethPeers.subscribeConnect(
(peer) -> {
final int peerCount = ethPeers.peerCount();
if (peerCount >= targetPeerCount) {
LOG.debug("Complete: {} peers connected.", targetPeerCount);
// We hit our target
result.complete(null);
} else {
LOG.debug(
"Waiting for {} total peers to connect. {} peers currently connected.",
targetPeerCount,
peerCount);
}
});
}
@Override
protected void cleanup() {
super.cleanup();
final Long listenerId = peerListenerId;
if (listenerId != null) {
ethContext.getEthPeers().unsubscribeConnect(peerListenerId);
}
}
}

View File

@@ -19,7 +19,6 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeerTask;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.eth.sync.tasks.DetermineCommonAncestorTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@@ -117,13 +116,16 @@ public abstract class AbstractSyncTargetManager {
protected abstract CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget();
private CompletableFuture<SyncTarget> waitForPeerAndThenSetSyncTarget() {
return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> findSyncTarget());
}
private CompletableFuture<?> waitForNewPeer() {
return ethContext
.getScheduler()
.timeout(WaitForPeerTask.create(ethContext, metricsSystem), Duration.ofSeconds(5));
.scheduleFutureTask(
() ->
ethContext
.getEthPeers()
.waitForPeer((peer) -> true)
.handle((ignored, ignored2) -> null)
.thenCompose((r) -> findSyncTarget()),
Duration.ofSeconds(5));
}
private boolean isCancelled() {

View File

@@ -21,7 +21,6 @@ import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.plugin.services.BesuEvents;
import java.util.Optional;
@@ -189,7 +188,12 @@ public class BackwardSyncAlgorithm implements BesuEvents.InitialSyncCompletionLi
final boolean await = latch.get().await(2, TimeUnit.MINUTES);
if (await) {
LOG.debug("Preconditions meet, ensure at least one peer is connected");
waitForPeers(1).get();
context
.getEthContext()
.getEthPeers()
.waitForPeer((peer) -> true)
.orTimeout(5, TimeUnit.SECONDS)
.get();
}
}
} catch (InterruptedException e) {
@@ -211,12 +215,6 @@ public class BackwardSyncAlgorithm implements BesuEvents.InitialSyncCompletionLi
}
}
private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(context.getEthContext(), count, context.getMetricsSystem());
return waitForPeersTask.run();
}
@Override
public void onInitialSyncCompleted() {
countDownIfReady();

View File

@@ -24,7 +24,6 @@ import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersExce
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@@ -131,11 +130,13 @@ public class FastSyncActions {
private CompletableFuture<FastSyncState> internalDownloadPivotBlockHeader(
final FastSyncState currentState) {
if (currentState.hasPivotBlockHeader()) {
LOG.info("Initial sync state {} already contains the block header", currentState);
LOG.debug("Initial sync state {} already contains the block header", currentState);
return completedFuture(currentState);
}
return waitForPeers(1)
return ethContext
.getEthPeers()
.waitForPeer((peer) -> true)
.thenCompose(
unused ->
currentState
@@ -245,11 +246,4 @@ public class FastSyncActions {
public boolean isBlockchainBehind(final long blockNumber) {
return protocolContext.getBlockchain().getChainHeadHeader().getNumber() < blockNumber;
}
private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(ethContext, count, metricsSystem);
return waitForPeersTask.run();
}
}

View File

@@ -21,7 +21,6 @@ import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorRespon
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeerTask;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByNumberTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@@ -175,7 +174,6 @@ class PivotBlockConfirmer {
// Stop loop if this task is done
return CompletableFuture.failedFuture(new CancellationException());
}
final Optional<RetryingGetHeaderFromPeerByNumberTask> query = createPivotQuery(blockNumber);
final CompletableFuture<BlockHeader> pivotHeaderFuture;
if (query.isPresent()) {
@@ -188,9 +186,17 @@ class PivotBlockConfirmer {
pivotHeaderFuture =
ethContext
.getScheduler()
.timeout(WaitForPeerTask.create(ethContext, metricsSystem), Duration.ofSeconds(5))
.handle((err, res) -> null) // Ignore result
.thenCompose(res -> executePivotQuery(blockNumber));
.scheduleFutureTask(
() ->
ethContext
.getEthPeers()
.waitForPeer(
(peer) -> !pivotBlockQueriesByPeerId.containsKey(peer.nodeId()))
// Ignore result, ensure even a timeout will result in calling
// executePivotQuery
.handle((r, e) -> null)
.thenCompose(res -> executePivotQuery(blockNumber)),
Duration.ofSeconds(5));
}
return pivotHeaderFuture;

View File

@@ -17,13 +17,11 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerLimiter;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -38,17 +36,14 @@ public class PivotSelectorFromPeers implements PivotBlockSelector {
protected final EthContext ethContext;
protected final SynchronizerConfiguration syncConfig;
private final SyncState syncState;
private final MetricsSystem metricsSystem;
public PivotSelectorFromPeers(
final EthContext ethContext,
final SynchronizerConfiguration syncConfig,
final SyncState syncState,
final MetricsSystem metricsSystem) {
final SyncState syncState) {
this.ethContext = ethContext;
this.syncConfig = syncConfig;
this.syncState = syncState;
this.metricsSystem = metricsSystem;
}
@Override
@@ -58,15 +53,19 @@ public class PivotSelectorFromPeers implements PivotBlockSelector {
@Override
public CompletableFuture<Void> prepareRetry() {
final long estimatedPivotBlock = conservativelyEstimatedPivotBlock();
final TrailingPeerLimiter trailingPeerLimiter =
new TrailingPeerLimiter(
ethContext.getEthPeers(),
() ->
new TrailingPeerRequirements(
conservativelyEstimatedPivotBlock(), syncConfig.getMaxTrailingPeers()));
estimatedPivotBlock, syncConfig.getMaxTrailingPeers()));
trailingPeerLimiter.enforceTrailingPeerLimit();
return waitForPeers(syncConfig.getSyncMinimumPeerCount());
return ethContext
.getEthPeers()
.waitForPeer((peer) -> peer.chainState().getEstimatedHeight() >= estimatedPivotBlock)
.thenRun(() -> {});
}
@Override
@@ -129,10 +128,4 @@ public class PivotSelectorFromPeers implements PivotBlockSelector {
syncState.getLocalChainHeight() + syncConfig.getSyncPivotDistance();
return Math.min(syncState.bestChainHeight(), estimatedNextPivot);
}
private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(ethContext, count, metricsSystem);
return waitForPeersTask.run();
}
}

View File

@@ -23,7 +23,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByHashTask;
@@ -129,7 +128,9 @@ public class PivotSelectorFromSafeBlock implements PivotBlockSelector {
LOG.debug(
"Downloading chain head block header by hash {}", headBlockHash);
try {
return waitForPeers(1)
return ethContext
.getEthPeers()
.waitForPeer((peer) -> true)
.thenCompose(unused -> downloadBlockHeader(headBlockHash))
.thenApply(
blockHeader -> {
@@ -193,11 +194,4 @@ public class PivotSelectorFromSafeBlock implements PivotBlockSelector {
}
});
}
private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(ethContext, count, metricsSystem);
return waitForPeersTask.run();
}
}

View File

@@ -1,87 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import static org.assertj.core.api.Assertions.assertThat;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestBuilder;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class WaitForPeerTaskTest {
private EthProtocolManager ethProtocolManager;
private EthContext ethContext;
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@BeforeEach
public void setupTest() {
ethProtocolManager = EthProtocolManagerTestBuilder.builder().build();
ethContext = ethProtocolManager.ethContext();
}
@Test
public void completesWhenPeerConnects() throws ExecutionException, InterruptedException {
// Execute task and wait for response
final AtomicBoolean successful = new AtomicBoolean(false);
final EthTask<Void> task = WaitForPeerTask.create(ethContext, metricsSystem);
final CompletableFuture<Void> future = task.run();
future.whenComplete(
(result, error) -> {
if (error == null) {
successful.compareAndSet(false, true);
}
});
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
assertThat(successful).isTrue();
}
@Test
public void doesNotCompleteWhenNoPeerConnects() throws ExecutionException, InterruptedException {
final AtomicBoolean successful = new AtomicBoolean(false);
final EthTask<Void> task = WaitForPeerTask.create(ethContext, metricsSystem);
final CompletableFuture<Void> future = task.run();
future.whenComplete(
(result, error) -> {
if (error == null) {
successful.compareAndSet(false, true);
}
});
assertThat(successful).isFalse();
}
@Test
public void cancel() throws ExecutionException, InterruptedException {
// Execute task
final EthTask<Void> task = WaitForPeerTask.create(ethContext, metricsSystem);
final CompletableFuture<Void> future = task.run();
assertThat(future.isDone()).isFalse();
task.cancel();
assertThat(future.isDone()).isTrue();
assertThat(future.isCancelled()).isTrue();
assertThat(task.run().isCancelled()).isTrue();
}
}

View File

@@ -1,105 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import static org.assertj.core.api.Assertions.assertThat;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestBuilder;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class WaitForPeersTaskTest {
private EthProtocolManager ethProtocolManager;
private EthContext ethContext;
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@BeforeEach
public void setupTest() {
ethProtocolManager = EthProtocolManagerTestBuilder.builder().build();
ethContext = ethProtocolManager.ethContext();
}
@Test
public void completesWhenPeersConnects() throws ExecutionException, InterruptedException {
// Execute task and wait for response
final AtomicBoolean successful = new AtomicBoolean(false);
final EthTask<Void> task = WaitForPeersTask.create(ethContext, 2, metricsSystem);
final CompletableFuture<Void> future = task.run();
future.whenComplete(
(result, error) -> {
if (error == null) {
successful.compareAndSet(false, true);
}
});
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
assertThat(successful).isTrue();
}
@Test
public void doesNotCompleteWhenNoPeerConnects() throws ExecutionException, InterruptedException {
final AtomicBoolean successful = new AtomicBoolean(false);
final EthTask<Void> task = WaitForPeersTask.create(ethContext, 2, metricsSystem);
final CompletableFuture<Void> future = task.run();
future.whenComplete(
(result, error) -> {
if (error == null) {
successful.compareAndSet(false, true);
}
});
assertThat(successful).isFalse();
}
@Test
public void doesNotCompleteWhenSomePeersConnects()
throws ExecutionException, InterruptedException {
final AtomicBoolean successful = new AtomicBoolean(false);
final EthTask<Void> task = WaitForPeersTask.create(ethContext, 2, metricsSystem);
final CompletableFuture<Void> future = task.run();
future.whenComplete(
(result, error) -> {
if (error == null) {
successful.compareAndSet(false, true);
}
});
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
assertThat(successful).isFalse();
}
@Test
public void cancel() throws ExecutionException, InterruptedException {
// Execute task
final EthTask<Void> task = WaitForPeersTask.create(ethContext, 2, metricsSystem);
final CompletableFuture<Void> future = task.run();
assertThat(future.isDone()).isFalse();
task.cancel();
assertThat(future.isDone()).isTrue();
assertThat(future.isCancelled()).isTrue();
assertThat(task.run().isCancelled()).isTrue();
}
}

View File

@@ -16,6 +16,8 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.config.GenesisConfigOptions;
@@ -29,7 +31,6 @@ import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestBuilder;
@@ -119,8 +120,7 @@ public class FastSyncActionsTest {
metricsSystem = new NoOpMetricsSystem();
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState));
}
@ParameterizedTest
@@ -170,8 +170,7 @@ public class FastSyncActionsTest {
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState));
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000);
@@ -188,8 +187,7 @@ public class FastSyncActionsTest {
setUp(storageFormat, false, Optional.of(1));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState));
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(1000), 5500);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(2000), 4000);
@@ -208,8 +206,7 @@ public class FastSyncActionsTest {
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState));
final CompletableFuture<FastSyncState> result =
fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
@@ -231,100 +228,27 @@ public class FastSyncActionsTest {
@ParameterizedTest
@ArgumentsSource(FastSyncActionsTest.FastSyncActionsTestArguments.class)
public void selectPivotBlockShouldWaitAndRetryIfSufficientChainHeightEstimatesAreUnavailable(
public void selectPivotBlockShouldRetryIfPivotBlockSelectorReturnsEmptyOptional(
final DataStorageFormat storageFormat) {
int minPeers = 3;
setUp(storageFormat, false, Optional.of(minPeers));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
final long minPivotHeight = syncConfig.getSyncPivotDistance() + 1L;
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
setUp(storageFormat, false, Optional.of(3));
// Create peers without chain height estimates
List<RespondingEthPeer> peers = new ArrayList<>();
for (int i = 0; i < minPeers; i++) {
final Difficulty td = Difficulty.of(i);
final OptionalLong height = OptionalLong.empty();
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, td, height);
peers.add(peer);
}
PivotBlockSelector pivotBlockSelector = mock(PivotBlockSelector.class);
fastSyncActions = createFastSyncActions(syncConfig, pivotBlockSelector);
// No pivot should be selected while peers do not have height estimates
final CompletableFuture<FastSyncState> result =
FastSyncState expectedResult = new FastSyncState(123);
when(pivotBlockSelector.selectNewPivotBlock())
.thenReturn(Optional.empty())
.thenReturn(Optional.of(expectedResult));
when(pivotBlockSelector.prepareRetry()).thenReturn(CompletableFuture.runAsync(() -> {}));
CompletableFuture<FastSyncState> resultFuture =
fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
assertThat(result).isNotDone();
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isNotDone();
// Set subset of heights
peers
.subList(0, minPeers - 1)
.forEach(p -> p.getEthPeer().chainState().updateHeightEstimate(minPivotHeight + 10));
verify(pivotBlockSelector, times(2)).selectNewPivotBlock();
verify(pivotBlockSelector).prepareRetry();
// No pivot should be selected while only a subset of peers have height estimates
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isNotDone();
// Set final height
final long bestPeerHeight = minPivotHeight + 1;
peers.get(minPeers - 1).getEthPeer().chainState().updateHeightEstimate(bestPeerHeight);
final FastSyncState expected =
new FastSyncState(bestPeerHeight - syncConfig.getSyncPivotDistance());
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isCompletedWithValue(expected);
}
@ParameterizedTest
@ArgumentsSource(FastSyncActionsTest.FastSyncActionsTestArguments.class)
public void selectPivotBlockShouldWaitAndRetryIfSufficientValidatedPeersUnavailable(
final DataStorageFormat storageFormat) {
final int minPeers = 3;
setUp(storageFormat, false, Optional.of(minPeers));
final PeerValidator validator = mock(PeerValidator.class);
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
final long minPivotHeight = syncConfig.getSyncPivotDistance() + 1L;
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
// Create peers that are not validated
final OptionalLong height = OptionalLong.of(minPivotHeight + 10);
List<RespondingEthPeer> peers = new ArrayList<>();
for (int i = 0; i < minPeers; i++) {
final Difficulty td = Difficulty.of(i);
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, td, height, validator);
peers.add(peer);
}
// No pivot should be selected while peers are not fully validated
final CompletableFuture<FastSyncState> result =
fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
assertThat(result).isNotDone();
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isNotDone();
// Validate a subset of peers
peers.subList(0, minPeers - 1).forEach(p -> p.getEthPeer().markValidated(validator));
// No pivot should be selected while only a subset of peers has height estimates
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isNotDone();
// Set best height and mark best peer validated
final long bestPeerHeight = minPivotHeight + 11;
final EthPeer bestPeer = peers.get(minPeers - 1).getEthPeer();
bestPeer.chainState().updateHeightEstimate(bestPeerHeight);
bestPeer.markValidated(validator);
final FastSyncState expected =
new FastSyncState(bestPeerHeight - syncConfig.getSyncPivotDistance());
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isCompletedWithValue(expected);
assertThat(resultFuture).isCompletedWithValue(expectedResult);
}
@ParameterizedTest
@@ -355,8 +279,7 @@ public class FastSyncActionsTest {
final int peerCount = 4;
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState));
final long minPivotHeight = syncConfig.getSyncPivotDistance() + 1L;
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
@@ -404,8 +327,7 @@ public class FastSyncActionsTest {
setUp(storageFormat, false, Optional.of(1));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState));
final long pivotDistance = syncConfig.getSyncPivotDistance();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
@@ -466,8 +388,7 @@ public class FastSyncActionsTest {
setUp(storageFormat, false, Optional.of(1));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
syncConfig, new PivotSelectorFromPeers(ethContext, syncConfig, syncState));
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1001);
final CompletableFuture<FastSyncState> result =

View File

@@ -164,6 +164,7 @@ public class PivotBlockRetrieverTest {
// Add new peer that we can query
final RespondingEthPeer respondingPeerB =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
respondingPeerB.respond(responder);
// We need one more responsive peer before we're done
@@ -174,6 +175,7 @@ public class PivotBlockRetrieverTest {
// Add new peer that we can query
final RespondingEthPeer respondingPeerC =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
respondingPeerC.respond(responder);
assertThat(badPeerA.hasOutstandingRequests()).isFalse();
assertThat(badPeerB.hasOutstandingRequests()).isFalse();
@@ -215,11 +217,9 @@ public class PivotBlockRetrieverTest {
final RespondingEthPeer respondingPeerB =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator);
respondingPeerB.getEthPeer().markValidated(peerValidator);
// When our new peer "connects", it is not yet valid, so we need to expire our retry timeout
// so that the peer will get re-processed
EthProtocolManagerTestUtil.expirePendingTimeouts(ethProtocolManager);
// add another peer to ensure we get past the waitForPeer call
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator);
assertThat(respondingPeerB.hasOutstandingRequests()).isTrue();
respondingPeerB.respond(responder);
// We need one more responsive peer before we're done
@@ -231,9 +231,9 @@ public class PivotBlockRetrieverTest {
final RespondingEthPeer respondingPeerC =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator);
respondingPeerC.getEthPeer().markValidated(peerValidator);
EthProtocolManagerTestUtil.expirePendingTimeouts(ethProtocolManager);
// add another peer to ensure we get past the waitForPeer call
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000, peerValidator);
assertThat(respondingPeerC.hasOutstandingRequests()).isTrue();
respondingPeerC.respond(responder);
assertThat(badPeerA.hasOutstandingRequests()).isFalse();
assertThat(badPeerB.hasOutstandingRequests()).isFalse();