findSyncTarget always called using Optional.empty (#3364)

Signed-off-by: Jiri Peinlich <jiri.peinlich@gmail.com>
This commit is contained in:
Jiri Peinlich
2022-02-03 10:25:22 +01:00
committed by GitHub
parent 73a97606ae
commit 33b8b872ff
4 changed files with 29 additions and 40 deletions

View File

@@ -32,7 +32,6 @@ import org.hyperledger.besu.services.pipeline.Pipeline;
import org.hyperledger.besu.util.ExceptionUtils;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -99,7 +98,7 @@ public class PipelineChainDownloader implements ChainDownloader {
private CompletableFuture<Void> selectSyncTargetAndDownload() {
return syncTargetManager
.findSyncTarget(Optional.empty())
.findSyncTarget()
.thenCompose(this::startDownloadForSyncTarget)
.thenRun(pipelineCompleteCounter::inc);
}

View File

@@ -55,14 +55,7 @@ public abstract class SyncTargetManager {
this.metricsSystem = metricsSystem;
}
public CompletableFuture<SyncTarget> findSyncTarget(
final Optional<SyncTarget> currentSyncTarget) {
return currentSyncTarget
.map(CompletableFuture::completedFuture) // Return an existing sync target if present
.orElseGet(this::selectNewSyncTarget);
}
private CompletableFuture<SyncTarget> selectNewSyncTarget() {
public CompletableFuture<SyncTarget> findSyncTarget() {
return selectBestAvailableSyncTarget()
.thenCompose(
maybeBestPeer -> {
@@ -113,7 +106,7 @@ public abstract class SyncTargetManager {
protected abstract CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget();
private CompletableFuture<SyncTarget> waitForPeerAndThenSetSyncTarget() {
return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> selectNewSyncTarget());
return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> findSyncTarget());
}
private CompletableFuture<?> waitForNewPeer() {

View File

@@ -37,7 +37,6 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.Di
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.services.pipeline.Pipeline;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -82,16 +81,16 @@ public class PipelineChainDownloaderTest {
@Test
public void shouldSelectSyncTargetWhenStarted() {
when(syncTargetManager.findSyncTarget(Optional.empty())).thenReturn(new CompletableFuture<>());
when(syncTargetManager.findSyncTarget()).thenReturn(new CompletableFuture<>());
chainDownloader.start();
verify(syncTargetManager).findSyncTarget(Optional.empty());
verify(syncTargetManager).findSyncTarget();
}
@Test
public void shouldStartChainDownloadWhenTargetSelected() {
final CompletableFuture<SyncTarget> selectTargetFuture = new CompletableFuture<>();
when(syncTargetManager.findSyncTarget(Optional.empty())).thenReturn(selectTargetFuture);
when(syncTargetManager.findSyncTarget()).thenReturn(selectTargetFuture);
expectPipelineCreation(syncTarget, downloadPipeline);
when(scheduler.startPipeline(downloadPipeline)).thenReturn(new CompletableFuture<>());
chainDownloader.start();
@@ -106,7 +105,7 @@ public class PipelineChainDownloaderTest {
@Test
public void shouldUpdateSyncStateWhenTargetSelected() {
final CompletableFuture<SyncTarget> selectTargetFuture = new CompletableFuture<>();
when(syncTargetManager.findSyncTarget(Optional.empty())).thenReturn(selectTargetFuture);
when(syncTargetManager.findSyncTarget()).thenReturn(selectTargetFuture);
expectPipelineCreation(syncTarget, downloadPipeline);
when(scheduler.startPipeline(downloadPipeline)).thenReturn(new CompletableFuture<>());
chainDownloader.start();
@@ -121,28 +120,28 @@ public class PipelineChainDownloaderTest {
public void shouldRetryWhenSyncTargetSelectionFailsAndSyncTargetManagerShouldContinue() {
final CompletableFuture<SyncTarget> selectTargetFuture = new CompletableFuture<>();
when(syncTargetManager.shouldContinueDownloading()).thenReturn(true);
when(syncTargetManager.findSyncTarget(Optional.empty()))
when(syncTargetManager.findSyncTarget())
.thenReturn(selectTargetFuture)
.thenReturn(new CompletableFuture<>());
chainDownloader.start();
verify(syncTargetManager).findSyncTarget(Optional.empty());
verify(syncTargetManager).findSyncTarget();
selectTargetFuture.completeExceptionally(new RuntimeException("Nope"));
verify(syncTargetManager, times(2)).findSyncTarget(Optional.empty());
verify(syncTargetManager, times(2)).findSyncTarget();
}
@Test
public void shouldBeCompleteWhenSyncTargetSelectionFailsAndSyncTargetManagerShouldNotContinue() {
final CompletableFuture<SyncTarget> selectTargetFuture = new CompletableFuture<>();
when(syncTargetManager.shouldContinueDownloading()).thenReturn(false);
when(syncTargetManager.findSyncTarget(Optional.empty()))
when(syncTargetManager.findSyncTarget())
.thenReturn(selectTargetFuture)
.thenReturn(new CompletableFuture<>());
final CompletableFuture<Void> result = chainDownloader.start();
verify(syncTargetManager).findSyncTarget(Optional.empty());
verify(syncTargetManager).findSyncTarget();
final RuntimeException exception = new RuntimeException("Nope");
selectTargetFuture.completeExceptionally(exception);
@@ -155,7 +154,7 @@ public class PipelineChainDownloaderTest {
final CompletableFuture<Void> pipelineFuture = expectPipelineStarted(syncTarget);
final CompletableFuture<Void> result = chainDownloader.start();
verify(syncTargetManager).findSyncTarget(Optional.empty());
verify(syncTargetManager).findSyncTarget();
when(syncTargetManager.shouldContinueDownloading()).thenReturn(false);
pipelineFuture.complete(null);
@@ -173,7 +172,7 @@ public class PipelineChainDownloaderTest {
final CompletableFuture<Void> result = chainDownloader.start();
verify(syncTargetManager).findSyncTarget(Optional.empty());
verify(syncTargetManager).findSyncTarget();
// Setup expectation for second time round.
expectPipelineStarted(syncTarget2, downloadPipeline2);
@@ -181,7 +180,7 @@ public class PipelineChainDownloaderTest {
pipelineFuture.complete(null);
assertThat(result).isNotDone();
verify(syncTargetManager, times(2)).findSyncTarget(Optional.empty());
verify(syncTargetManager, times(2)).findSyncTarget();
assertThat(result).isNotDone();
}
@@ -196,15 +195,15 @@ public class PipelineChainDownloaderTest {
final CompletableFuture<Void> result = chainDownloader.start();
verify(syncTargetManager).findSyncTarget(Optional.empty());
verify(syncTargetManager).findSyncTarget();
// Setup expectation for second call
when(syncTargetManager.findSyncTarget(Optional.empty())).thenReturn(findSecondSyncTargetFuture);
when(syncTargetManager.findSyncTarget()).thenReturn(findSecondSyncTargetFuture);
pipelineFuture.complete(null);
assertThat(result).isNotDone();
verify(syncTargetManager, times(2)).findSyncTarget(Optional.empty());
verify(syncTargetManager, times(2)).findSyncTarget();
assertThat(result).isNotDone();
final RuntimeException exception = new RuntimeException("Nope");
@@ -221,10 +220,10 @@ public class PipelineChainDownloaderTest {
public void shouldNotStartDownloadIfCancelledWhileSelectingSyncTarget() {
final CompletableFuture<SyncTarget> selectSyncTargetFuture = new CompletableFuture<>();
lenient().when(syncTargetManager.shouldContinueDownloading()).thenReturn(true);
when(syncTargetManager.findSyncTarget(Optional.empty())).thenReturn(selectSyncTargetFuture);
when(syncTargetManager.findSyncTarget()).thenReturn(selectSyncTargetFuture);
final CompletableFuture<Void> result = chainDownloader.start();
verify(syncTargetManager).findSyncTarget(Optional.empty());
verify(syncTargetManager).findSyncTarget();
chainDownloader.cancel();
// Note the future doesn't complete until all activity has come to a stop.
@@ -241,7 +240,7 @@ public class PipelineChainDownloaderTest {
final CompletableFuture<Void> pipelineFuture = expectPipelineStarted(syncTarget);
final CompletableFuture<Void> result = chainDownloader.start();
verify(syncTargetManager).findSyncTarget(Optional.empty());
verify(syncTargetManager).findSyncTarget();
verify(downloadPipelineFactory).createDownloadPipelineForSyncTarget(syncTarget);
chainDownloader.cancel();
@@ -262,7 +261,7 @@ public class PipelineChainDownloaderTest {
final CompletableFuture<Void> pipelineFuture1 = new CompletableFuture<>();
final CompletableFuture<Void> pipelineFuture2 = new CompletableFuture<>();
when(syncTargetManager.findSyncTarget(Optional.empty()))
when(syncTargetManager.findSyncTarget())
.thenReturn(completedFuture(syncTarget))
.thenReturn(completedFuture(syncTarget2));
@@ -314,12 +313,12 @@ public class PipelineChainDownloaderTest {
final boolean isFinishedDownloading, final boolean isCancelled) {
final CompletableFuture<SyncTarget> selectTargetFuture = new CompletableFuture<>();
when(syncTargetManager.shouldContinueDownloading()).thenReturn(isFinishedDownloading);
when(syncTargetManager.findSyncTarget(Optional.empty()))
when(syncTargetManager.findSyncTarget())
.thenReturn(selectTargetFuture)
.thenReturn(new CompletableFuture<>());
chainDownloader.start();
verify(syncTargetManager).findSyncTarget(Optional.empty());
verify(syncTargetManager).findSyncTarget();
if (isCancelled) {
chainDownloader.cancel();
}
@@ -335,8 +334,7 @@ public class PipelineChainDownloaderTest {
private CompletableFuture<Void> expectPipelineStarted(
final SyncTarget syncTarget, final Pipeline<?> pipeline) {
final CompletableFuture<Void> pipelineFuture = new CompletableFuture<>();
when(syncTargetManager.findSyncTarget(Optional.empty()))
.thenReturn(completedFuture(syncTarget));
when(syncTargetManager.findSyncTarget()).thenReturn(completedFuture(syncTarget));
expectPipelineCreation(syncTarget, pipeline);
when(scheduler.startPipeline(pipeline)).thenReturn(pipelineFuture);
return pipelineFuture;

View File

@@ -40,7 +40,6 @@ import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.junit.After;
@@ -115,7 +114,7 @@ public class FullSyncTargetManagerTest {
final RespondingEthPeer bestPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.MAX_VALUE, 4);
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget(Optional.empty());
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget();
bestPeer.respond(responder);
assertThat(result)
@@ -132,7 +131,7 @@ public class FullSyncTargetManagerTest {
final RespondingEthPeer bestPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.MAX_VALUE, 0);
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget(Optional.empty());
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget();
bestPeer.respond(responder);
assertThat(result).isNotCompleted();
@@ -147,7 +146,7 @@ public class FullSyncTargetManagerTest {
final RespondingEthPeer bestPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 20);
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget(Optional.empty());
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget();
bestPeer.respond(responder);
@@ -164,7 +163,7 @@ public class FullSyncTargetManagerTest {
final RespondingEthPeer bestPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 20);
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget(Optional.empty());
final CompletableFuture<SyncTarget> result = syncTargetManager.findSyncTarget();
bestPeer.respond(responder);