Adapt Fast sync, and Snap sync, to use finalized block as pivot after the Merge (#3655)

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
This commit is contained in:
Fabio Di Fabio
2022-04-25 13:00:37 +02:00
committed by GitHub
parent 714c5383a5
commit e01f63bbfa
30 changed files with 919 additions and 141 deletions

View File

@@ -13,6 +13,7 @@
### Additions and Improvements
- Onchain node permissioning - log the enodeURL that was previously only throwing an IllegalStateException during the isPermitted check [#3697](https://github.com/hyperledger/besu/pull/3697)
- \[EXPERIMENTAL\] Add snapsync `--sync-mode="X_SNAP"` (only as client) [#3710](https://github.com/hyperledger/besu/pull/3710)
- Adapt Fast sync, and Snap sync, to use finalized block, from consensus layer, as pivot after the Merge [#3506](https://github.com/hyperledger/besu/issues/3506)
### Bug Fixes

View File

@@ -17,6 +17,9 @@ package org.hyperledger.besu.controller;
import static com.google.common.base.Preconditions.checkNotNull;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.consensus.merge.FinalizedBlockHashSupplier;
import org.hyperledger.besu.consensus.merge.MergeContext;
import org.hyperledger.besu.consensus.qbft.pki.PkiBlockCreationConfiguration;
import org.hyperledger.besu.crypto.NodeKey;
import org.hyperledger.besu.datatypes.Hash;
@@ -49,8 +52,12 @@ import org.hyperledger.besu.ethereum.eth.peervalidation.DaoForkPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromFinalizedBlock;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromPeers;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.TransitionPivotSelector;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
@@ -79,6 +86,7 @@ import java.nio.file.Path;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -329,8 +337,9 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
syncConfig.getComputationParallelism(),
metricsSystem);
final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler);
final SyncState syncState = new SyncState(blockchain, ethPeers);
final boolean fastSyncEnabled = SyncMode.FAST.equals(syncConfig.getSyncMode());
final boolean fastSyncEnabled =
EnumSet.of(SyncMode.FAST, SyncMode.X_SNAP).contains(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fastSyncEnabled);
final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(
@@ -360,6 +369,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final Optional<SnapProtocolManager> maybeSnapProtocolManager =
createSnapProtocolManager(peerValidators, ethPeers, snapMessages, worldStateArchive);
final PivotBlockSelector pivotBlockSelector = createPivotSelector(protocolContext);
final Synchronizer synchronizer =
new DefaultSynchronizer(
syncConfig,
@@ -368,12 +379,13 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
worldStateStorage,
ethProtocolManager.getBlockBroadcaster(),
maybePruner,
ethProtocolManager.ethContext(),
ethContext,
syncState,
dataDirectory,
clock,
metricsSystem,
getFullSyncTerminationCondition(protocolContext.getBlockchain()));
getFullSyncTerminationCondition(protocolContext.getBlockchain()),
pivotBlockSelector);
final MiningCoordinator miningCoordinator =
createMiningCoordinator(
@@ -418,6 +430,41 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
additionalPluginServices);
}
private PivotBlockSelector createPivotSelector(final ProtocolContext protocolContext) {
final PivotSelectorFromPeers pivotSelectorFromPeers = new PivotSelectorFromPeers(syncConfig);
final GenesisConfigOptions genesisConfigOptions = genesisConfig.getConfigOptions();
if (genesisConfigOptions.getTerminalTotalDifficulty().isPresent()) {
LOG.info(
"TTD difficulty is present, creating initial sync phase with transition to PoS support");
final MergeContext mergeContext = protocolContext.getConsensusContext(MergeContext.class);
final FinalizedBlockHashSupplier finalizedBlockHashSupplier =
new FinalizedBlockHashSupplier();
final long subscriptionId =
mergeContext.addNewForkchoiceMessageListener(finalizedBlockHashSupplier);
final Runnable unsubscribeFinalizedBlockHashListener =
() -> {
mergeContext.removeNewForkchoiceMessageListener(subscriptionId);
LOG.info("Initial sync done, unsubscribe finalized block hash supplier");
};
return new TransitionPivotSelector(
genesisConfigOptions,
finalizedBlockHashSupplier,
pivotSelectorFromPeers,
new PivotSelectorFromFinalizedBlock(
genesisConfigOptions,
finalizedBlockHashSupplier,
unsubscribeFinalizedBlockHashListener));
} else {
LOG.info("TTD difficulty is not present, creating initial sync phase for PoW");
return pivotSelectorFromPeers;
}
}
protected SyncTerminationCondition getFullSyncTerminationCondition(final Blockchain blockchain) {
return genesisConfig
.getConfigOptions()

View File

@@ -0,0 +1,45 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.consensus.merge;
import org.hyperledger.besu.consensus.merge.MergeContext.NewForkchoiceMessageListener;
import org.hyperledger.besu.datatypes.Hash;
import java.util.Optional;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FinalizedBlockHashSupplier
implements Supplier<Optional<Hash>>, NewForkchoiceMessageListener {
private static final Logger LOG = LoggerFactory.getLogger(FinalizedBlockHashSupplier.class);
private volatile Optional<Hash> lastAnnouncedFinalizedBlockHash = Optional.empty();
@Override
public void onNewForkchoiceMessage(
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash) {
lastAnnouncedFinalizedBlockHash = maybeFinalizedBlockHash;
LOG.debug("New finalized block hash announced {}", lastAnnouncedFinalizedBlockHash);
}
@Override
public Optional<Hash> get() {
return lastAnnouncedFinalizedBlockHash;
}
}

View File

@@ -15,6 +15,7 @@
package org.hyperledger.besu.consensus.merge;
import org.hyperledger.besu.consensus.merge.blockcreation.PayloadIdentifier;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ConsensusContext;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@@ -37,6 +38,11 @@ public interface MergeContext extends ConsensusContext {
void observeNewIsPostMergeState(final NewMergeStateCallback newMergeStateCallback);
long addNewForkchoiceMessageListener(
final NewForkchoiceMessageListener newForkchoiceMessageListener);
void removeNewForkchoiceMessageListener(final long subscriberId);
Difficulty getTerminalTotalDifficulty();
void setFinalized(final BlockHeader blockHeader);
@@ -53,7 +59,19 @@ public interface MergeContext extends ConsensusContext {
Optional<Block> retrieveBlockById(final PayloadIdentifier payloadId);
void fireNewForkchoiceMessageEvent(
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash);
interface NewMergeStateCallback {
void onNewIsPostMergeState(final boolean newIsPostMergeState);
}
interface NewForkchoiceMessageListener {
void onNewForkchoiceMessage(
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash);
}
}

View File

@@ -15,6 +15,7 @@
package org.hyperledger.besu.consensus.merge;
import org.hyperledger.besu.consensus.merge.blockcreation.PayloadIdentifier;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ConsensusContext;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@@ -42,6 +43,8 @@ public class PostMergeContext implements MergeContext {
new AtomicReference<>(Optional.empty());
private final Subscribers<NewMergeStateCallback> newMergeStateCallbackSubscribers =
Subscribers.create();
private final Subscribers<NewForkchoiceMessageListener> newForkchoiceMessageCallbackSubscribers =
Subscribers.create();
private final EvictingQueue<PayloadTuple> blocksInProgress =
EvictingQueue.create(MAX_BLOCKS_IN_PROGRESS);
@@ -123,6 +126,26 @@ public class PostMergeContext implements MergeContext {
newMergeStateCallbackSubscribers.subscribe(newMergeStateCallback);
}
@Override
public long addNewForkchoiceMessageListener(
final NewForkchoiceMessageListener newForkchoiceMessageListener) {
return newForkchoiceMessageCallbackSubscribers.subscribe(newForkchoiceMessageListener);
}
@Override
public void removeNewForkchoiceMessageListener(final long subscriberId) {
newForkchoiceMessageCallbackSubscribers.unsubscribe(subscriberId);
}
@Override
public void fireNewForkchoiceMessageEvent(
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash) {
newForkchoiceMessageCallbackSubscribers.forEach(
cb -> cb.onNewForkchoiceMessage(headBlockHash, maybeFinalizedBlockHash, safeBlockHash));
}
@Override
public Difficulty getTerminalTotalDifficulty() {
return terminalTotalDifficulty.get();

View File

@@ -15,6 +15,7 @@
package org.hyperledger.besu.consensus.merge;
import org.hyperledger.besu.consensus.merge.blockcreation.PayloadIdentifier;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ConsensusContext;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@@ -71,6 +72,26 @@ public class TransitionContext implements MergeContext {
postMergeContext.observeNewIsPostMergeState(newMergeStateCallback);
}
@Override
public long addNewForkchoiceMessageListener(
final NewForkchoiceMessageListener newForkchoiceMessageListener) {
return postMergeContext.addNewForkchoiceMessageListener(newForkchoiceMessageListener);
}
@Override
public void removeNewForkchoiceMessageListener(final long subscriberId) {
postMergeContext.removeNewForkchoiceMessageListener(subscriberId);
}
@Override
public void fireNewForkchoiceMessageEvent(
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash) {
postMergeContext.fireNewForkchoiceMessageEvent(
headBlockHash, maybeFinalizedBlockHash, safeBlockHash);
}
@Override
public Difficulty getTerminalTotalDifficulty() {
return postMergeContext.getTerminalTotalDifficulty();

View File

@@ -66,6 +66,13 @@ public class EngineForkchoiceUpdated extends ExecutionEngineJsonRpcMethod {
final Optional<EnginePayloadAttributesParameter> optionalPayloadAttributes =
requestContext.getOptionalParameter(1, EnginePayloadAttributesParameter.class);
Optional<Hash> maybeFinalizedHash =
Optional.ofNullable(forkChoice.getFinalizedBlockHash())
.filter(finalized -> !Hash.ZERO.equals(finalized));
mergeContext.fireNewForkchoiceMessageEvent(
forkChoice.getHeadBlockHash(), maybeFinalizedHash, forkChoice.getSafeBlockHash());
if (mergeContext.isSyncing()) {
return new JsonRpcSuccessResponse(
requestContext.getRequest().getId(),
@@ -80,14 +87,10 @@ public class EngineForkchoiceUpdated extends ExecutionEngineJsonRpcMethod {
Optional<BlockHeader> newHead =
protocolContext.getBlockchain().getBlockHeader(forkChoice.getHeadBlockHash());
Optional<Hash> finalizedHash =
Optional.ofNullable(forkChoice.getFinalizedBlockHash())
.filter(finalized -> !Hash.ZERO.equals(finalized));
Optional<BlockHeader> finalizedHead =
finalizedHash.flatMap(protocolContext.getBlockchain()::getBlockHeader);
maybeFinalizedHash.flatMap(protocolContext.getBlockchain()::getBlockHeader);
if (newHead.isPresent() && (finalizedHash.isEmpty() || finalizedHead.isPresent())) {
if (newHead.isPresent() && (maybeFinalizedHash.isEmpty() || finalizedHead.isPresent())) {
// TODO: post-merge cleanup, this should be unnecessary after merge
if (!mergeCoordinator.latestValidAncestorDescendsFromTerminal(newHead.get())) {

View File

@@ -21,6 +21,7 @@ import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.Executi
import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.ExecutionEngineJsonRpcMethod.EngineStatus.VALID;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.consensus.merge.MergeContext;
@@ -228,6 +229,10 @@ public class EngineForkchoiceUpdatedTest {
assertThat(res.getPayloadStatus().getLatestValidHash()).isEmpty();
assertThat(res.getPayloadId()).isNull();
}
// assert that listeners are always notified
verify(mergeContext).fireNewForkchoiceMessageEvent(mockHash, Optional.of(mockHash), mockHash);
return res;
}

View File

@@ -51,10 +51,12 @@ public class DefaultSynchronizer implements Synchronizer {
private final Optional<Pruner> maybePruner;
private final SyncState syncState;
private final AtomicBoolean running = new AtomicBoolean(false);
private final BlockPropagationManager blockPropagationManager;
private final Optional<BlockPropagationManager> blockPropagationManager;
private final Optional<FastSyncDownloader<?>> fastSyncDownloader;
private final FullSyncDownloader fullSyncDownloader;
private final Optional<FullSyncDownloader> fullSyncDownloader;
private final ProtocolContext protocolContext;
private final PivotBlockSelector pivotBlockSelector;
private final SyncTerminationCondition terminationCondition;
public DefaultSynchronizer(
final SynchronizerConfiguration syncConfig,
@@ -68,11 +70,14 @@ public class DefaultSynchronizer implements Synchronizer {
final Path dataDirectory,
final Clock clock,
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition) {
final SyncTerminationCondition terminationCondition,
final PivotBlockSelector pivotBlockSelector) {
this.maybePruner = maybePruner;
this.syncState = syncState;
this.pivotBlockSelector = pivotBlockSelector;
this.protocolContext = protocolContext;
this.terminationCondition = terminationCondition;
ChainHeadTracker.trackChainHeadForPeers(
ethContext,
protocolSchedule,
@@ -81,29 +86,36 @@ public class DefaultSynchronizer implements Synchronizer {
metricsSystem);
this.blockPropagationManager =
new BlockPropagationManager(
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
syncState,
new PendingBlocksManager(syncConfig),
metricsSystem,
blockBroadcaster);
terminationCondition.shouldStopDownload()
? Optional.empty()
: Optional.of(
new BlockPropagationManager(
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
syncState,
new PendingBlocksManager(syncConfig),
metricsSystem,
blockBroadcaster));
this.fullSyncDownloader =
new FullSyncDownloader(
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
terminationCondition);
terminationCondition.shouldStopDownload()
? Optional.empty()
: Optional.of(
new FullSyncDownloader(
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
terminationCondition));
if (SyncMode.X_SNAP.equals(syncConfig.getSyncMode())) {
this.fastSyncDownloader =
SnapDownloaderFactory.createSnapDownloader(
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
@@ -116,6 +128,7 @@ public class DefaultSynchronizer implements Synchronizer {
} else {
this.fastSyncDownloader =
FastDownloaderFactory.create(
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
@@ -142,19 +155,23 @@ public class DefaultSynchronizer implements Synchronizer {
private TrailingPeerRequirements calculateTrailingPeerRequirements() {
return fastSyncDownloader
.flatMap(FastSyncDownloader::calculateTrailingPeerRequirements)
.orElseGet(fullSyncDownloader::calculateTrailingPeerRequirements);
.orElse(
fullSyncDownloader
.map(FullSyncDownloader::calculateTrailingPeerRequirements)
.orElse(TrailingPeerRequirements.UNRESTRICTED));
}
@Override
public CompletableFuture<Void> start() {
if (running.compareAndSet(false, true)) {
LOG.info("Starting synchronizer.");
blockPropagationManager.start();
blockPropagationManager.ifPresent(BlockPropagationManager::start);
CompletableFuture<Void> future;
if (fastSyncDownloader.isPresent()) {
future = fastSyncDownloader.get().start().thenCompose(this::handleFastSyncResult);
} else {
syncState.markInitialSyncPhaseAsDone();
future = startFullSync();
}
future = future.thenApply(this::finalizeSync);
@@ -169,9 +186,9 @@ public class DefaultSynchronizer implements Synchronizer {
if (running.compareAndSet(true, false)) {
LOG.info("Stopping synchronizer");
fastSyncDownloader.ifPresent(FastSyncDownloader::stop);
fullSyncDownloader.stop();
fullSyncDownloader.ifPresent(FullSyncDownloader::stop);
maybePruner.ifPresent(Pruner::stop);
blockPropagationManager.stop();
blockPropagationManager.ifPresent(BlockPropagationManager::stop);
}
}
@@ -196,12 +213,24 @@ public class DefaultSynchronizer implements Synchronizer {
LOG.info(
"Fast sync completed successfully with pivot block {}",
result.getPivotBlockNumber().getAsLong());
return startFullSync();
pivotBlockSelector.close();
syncState.markInitialSyncPhaseAsDone();
return terminationCondition.shouldContinueDownload()
? startFullSync()
: CompletableFuture.completedFuture(null);
}
private CompletableFuture<Void> startFullSync() {
maybePruner.ifPresent(Pruner::start);
return fullSyncDownloader.start();
return fullSyncDownloader
.map(FullSyncDownloader::start)
.orElse(CompletableFuture.completedFuture(null))
.thenRun(
() -> {
if (terminationCondition.shouldStopDownload()) {
syncState.setReachedTerminalDifficulty(true);
}
});
}
@Override
@@ -240,7 +269,7 @@ public class DefaultSynchronizer implements Synchronizer {
private Void finalizeSync(final Void unused) {
LOG.info("Stopping block propagation.");
blockPropagationManager.stop();
blockPropagationManager.ifPresent(BlockPropagationManager::stop);
LOG.info("Stopping the pruner.");
maybePruner.ifPresent(Pruner::stop);
running.set(false);

View File

@@ -107,7 +107,7 @@ public class PipelineChainDownloader implements ChainDownloader {
@SuppressWarnings("unused") final Void result) {
syncState.clearSyncTarget();
if (syncTargetManager.shouldContinueDownloading()
&& !syncState.hasReachedTerminalDifficulty().orElse(false)) {
&& !syncState.hasReachedTerminalDifficulty().orElse(Boolean.FALSE)) {
return performDownload();
} else {
LOG.info("PipelineChain download complete");
@@ -149,7 +149,7 @@ public class PipelineChainDownloader implements ChainDownloader {
}
private synchronized CompletionStage<Void> startDownloadForSyncTarget(final SyncTarget target) {
if (cancelled.get() || syncState.hasReachedTerminalDifficulty().orElse(false)) {
if (cancelled.get() || syncState.hasReachedTerminalDifficulty().orElse(Boolean.FALSE)) {
return CompletableFuture.failedFuture(
new CancellationException("Chain download was cancelled"));
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import java.util.Optional;
public interface PivotBlockSelector {
Optional<FastSyncState> selectNewPivotBlock(EthPeer peer);
default void close() {
// do nothing by default
}
default long getMinRequiredBlockNumber() {
return 0L;
}
}

View File

@@ -178,8 +178,9 @@ public class BackwardSyncContext {
return getBlockValidator(block.getHeader().getNumber());
}
public boolean isOnTTD() {
return syncState.hasReachedTerminalDifficulty().orElse(false);
public boolean isReady() {
return syncState.hasReachedTerminalDifficulty().orElse(Boolean.FALSE)
&& syncState.isInitialSyncPhaseDone();
}
public CompletableFuture<Void> stop() {
@@ -191,7 +192,7 @@ public class BackwardSyncContext {
if (firstHash.isPresent()) {
return executeSyncStep(firstHash.get());
}
if (!isOnTTD()) {
if (!isReady()) {
return waitForTTD().thenCompose(this::executeNextStep);
}
final Optional<BlockHeader> firstAncestorHeader = backwardChain.getFirstAncestorHeader();
@@ -234,23 +235,23 @@ public class BackwardSyncContext {
final long id =
syncState.subscribeTTDReached(
reached -> {
if (reached) {
if (reached && syncState.isInitialSyncPhaseDone()) {
latch.countDown();
}
});
return CompletableFuture.runAsync(
() -> {
try {
if (!isOnTTD()) {
LOG.info("Waiting for TTD...");
if (!isReady()) {
LOG.info("Waiting for preconditions...");
final boolean await = latch.await(2, TimeUnit.MINUTES);
if (await) {
LOG.info("TTD reached...");
LOG.info("Preconditions meet...");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BackwardSyncException("Wait for TTD was interrupted");
throw new BackwardSyncException("Wait for TTD preconditions interrupted");
} finally {
syncState.unsubscribeTTDReached(id);
}

View File

@@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockImporter;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
@@ -65,6 +67,7 @@ public class FastImportBlocksStep implements Consumer<List<BlockWithReceipts>> {
blockWithReceipts.getHeader().getNumber(),
blockWithReceipts.getHash());
}
traceLambda(LOG, "Imported block {}", blockWithReceipts.getBlock()::toLogString);
}
if (logStartBlock.isEmpty()) {
logStartBlock = OptionalLong.of(blocksWithReceipts.get(0).getNumber());

View File

@@ -17,16 +17,18 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerLimiter;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByHashTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
@@ -35,6 +37,7 @@ import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.util.ExceptionUtils;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
@@ -52,6 +55,7 @@ public class FastSyncActions {
private final ProtocolContext protocolContext;
private final EthContext ethContext;
private final SyncState syncState;
private final PivotBlockSelector pivotBlockSelector;
private final MetricsSystem metricsSystem;
private final Counter pivotBlockSelectionCounter;
private final AtomicLong pivotBlockGauge = new AtomicLong(0);
@@ -63,6 +67,7 @@ public class FastSyncActions {
final ProtocolContext protocolContext,
final EthContext ethContext,
final SyncState syncState,
final PivotBlockSelector pivotBlockSelector,
final MetricsSystem metricsSystem) {
this.syncConfig = syncConfig;
this.worldStateStorage = worldStateStorage;
@@ -70,6 +75,7 @@ public class FastSyncActions {
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.syncState = syncState;
this.pivotBlockSelector = pivotBlockSelector;
this.metricsSystem = metricsSystem;
pivotBlockSelectionCounter =
@@ -125,42 +131,40 @@ public class FastSyncActions {
public CompletableFuture<FastSyncState> selectPivotBlock(final FastSyncState fastSyncState) {
return fastSyncState.hasPivotBlockHeader()
? completedFuture(fastSyncState)
: selectPivotBlockFromPeers();
: selectNewPivotBlock();
}
private CompletableFuture<FastSyncState> selectPivotBlockFromPeers() {
private CompletableFuture<FastSyncState> selectNewPivotBlock() {
return selectBestPeer()
.map(
bestPeer ->
pivotBlockSelector
.selectNewPivotBlock(bestPeer)
.map(CompletableFuture::completedFuture)
.orElse(null))
.orElseGet(this::retrySelectPivotBlockAfterDelay);
}
private Optional<EthPeer> selectBestPeer() {
return ethContext
.getEthPeers()
.bestPeerMatchingCriteria(this::canPeerDeterminePivotBlock)
// Only select a pivot block number when we have a minimum number of height estimates
.filter(
peer -> {
final long peerCount = countPeersThatCanDeterminePivotBlock();
final int minPeerCount = syncConfig.getFastSyncMinimumPeerCount();
if (peerCount < minPeerCount) {
LOG.info(
"Waiting for valid peers with chain height information. {} / {} required peers currently available.",
peerCount,
minPeerCount);
return false;
}
return true;
})
.map(
peer -> {
final long pivotBlockNumber =
peer.chainState().getEstimatedHeight() - syncConfig.getFastSyncPivotDistance();
if (pivotBlockNumber <= BlockHeader.GENESIS_BLOCK_NUMBER) {
// Peer's chain isn't long enough, return an empty value so we can try again.
LOG.info("Waiting for peers with sufficient chain height");
return null;
}
LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber);
pivotBlockSelectionCounter.inc();
pivotBlockGauge.set(pivotBlockNumber);
return completedFuture(new FastSyncState(pivotBlockNumber));
})
.orElseGet(this::retrySelectPivotBlockAfterDelay);
.filter(unused -> enoughFastSyncPeersArePresent());
}
private boolean enoughFastSyncPeersArePresent() {
final long peerCount = countPeersThatCanDeterminePivotBlock();
final int minPeerCount = syncConfig.getFastSyncMinimumPeerCount();
if (peerCount < minPeerCount) {
LOG.info(
"Waiting for valid peers with chain height information. {} / {} required peers currently available.",
peerCount,
minPeerCount);
return false;
}
return true;
}
private long countPeersThatCanDeterminePivotBlock() {
@@ -198,22 +202,41 @@ public class FastSyncActions {
trailingPeerLimiter.enforceTrailingPeerLimit();
return waitForPeers(syncConfig.getFastSyncMinimumPeerCount())
.thenCompose(ignore -> selectPivotBlockFromPeers());
.thenCompose(ignore -> selectNewPivotBlock());
}
public CompletableFuture<FastSyncState> downloadPivotBlockHeader(
final FastSyncState currentState) {
if (currentState.getPivotBlockHeader().isPresent()) {
return internalDownloadPivotBlockHeader(currentState).thenApply(this::updateStats);
}
private CompletableFuture<FastSyncState> internalDownloadPivotBlockHeader(
final FastSyncState currentState) {
if (currentState.hasPivotBlockHeader()) {
return completedFuture(currentState);
}
return new PivotBlockRetriever(
protocolSchedule,
ethContext,
metricsSystem,
currentState.getPivotBlockNumber().getAsLong(),
syncConfig.getFastSyncMinimumPeerCount(),
syncConfig.getFastSyncPivotDistance())
.downloadPivotBlockHeader();
return currentState
.getPivotBlockHash()
.map(this::downloadPivotBlockHeader)
.orElseGet(
() ->
new PivotBlockRetriever(
protocolSchedule,
ethContext,
metricsSystem,
currentState.getPivotBlockNumber().getAsLong(),
syncConfig.getFastSyncMinimumPeerCount(),
syncConfig.getFastSyncPivotDistance())
.downloadPivotBlockHeader());
}
private FastSyncState updateStats(final FastSyncState fastSyncState) {
pivotBlockSelectionCounter.inc();
fastSyncState
.getPivotBlockHeader()
.ifPresent(blockHeader -> pivotBlockGauge.set(blockHeader.getNumber()));
return fastSyncState;
}
public ChainDownloader createChainDownloader(final FastSyncState currentState) {
@@ -227,4 +250,21 @@ public class FastSyncActions {
metricsSystem,
currentState);
}
private CompletableFuture<FastSyncState> downloadPivotBlockHeader(final Hash hash) {
return RetryingGetHeaderFromPeerByHashTask.byHash(
protocolSchedule,
ethContext,
hash,
pivotBlockSelector.getMinRequiredBlockNumber(),
metricsSystem)
.getHeader()
.thenApply(
blockHeader -> {
LOG.info(
"Successfully downloaded pivot block header by hash: {}",
blockHeader.toLogString());
return new FastSyncState(blockHeader);
});
}
}

View File

@@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import java.util.Objects;
@@ -22,28 +23,39 @@ import java.util.OptionalLong;
public class FastSyncState {
public static FastSyncState EMPTY_SYNC_STATE =
new FastSyncState(OptionalLong.empty(), Optional.empty());
public static final FastSyncState EMPTY_SYNC_STATE = new FastSyncState();
private OptionalLong pivotBlockNumber;
private Optional<Hash> pivotBlockHash;
private Optional<BlockHeader> pivotBlockHeader;
public FastSyncState() {
pivotBlockNumber = OptionalLong.empty();
pivotBlockHash = Optional.empty();
pivotBlockHeader = Optional.empty();
}
public FastSyncState(final long pivotBlockNumber) {
this(OptionalLong.of(pivotBlockNumber), Optional.empty());
this(OptionalLong.of(pivotBlockNumber), Optional.empty(), Optional.empty());
}
public FastSyncState(final Hash pivotBlockHash) {
this(OptionalLong.empty(), Optional.of(pivotBlockHash), Optional.empty());
}
public FastSyncState(final BlockHeader pivotBlockHeader) {
this(OptionalLong.of(pivotBlockHeader.getNumber()), Optional.of(pivotBlockHeader));
this(
OptionalLong.of(pivotBlockHeader.getNumber()),
Optional.of(pivotBlockHeader.getHash()),
Optional.of(pivotBlockHeader));
}
protected FastSyncState(
final OptionalLong pivotBlockNumber, final Optional<BlockHeader> pivotBlockHeader) {
final OptionalLong pivotBlockNumber,
final Optional<Hash> pivotBlockHash,
final Optional<BlockHeader> pivotBlockHeader) {
this.pivotBlockNumber = pivotBlockNumber;
this.pivotBlockHash = pivotBlockHash;
this.pivotBlockHeader = pivotBlockHeader;
}
@@ -51,6 +63,10 @@ public class FastSyncState {
return pivotBlockNumber;
}
public Optional<Hash> getPivotBlockHash() {
return pivotBlockHash;
}
public Optional<BlockHeader> getPivotBlockHeader() {
return pivotBlockHeader;
}
@@ -61,6 +77,7 @@ public class FastSyncState {
public void setCurrentHeader(final BlockHeader header) {
pivotBlockNumber = OptionalLong.of(header.getNumber());
pivotBlockHash = Optional.of(header.getHash());
pivotBlockHeader = Optional.of(header);
}
@@ -70,12 +87,13 @@ public class FastSyncState {
if (o == null || getClass() != o.getClass()) return false;
FastSyncState that = (FastSyncState) o;
return Objects.equals(pivotBlockNumber, that.pivotBlockNumber)
&& Objects.equals(pivotBlockHash, that.pivotBlockHash)
&& Objects.equals(pivotBlockHeader, that.pivotBlockHeader);
}
@Override
public int hashCode() {
return Objects.hash(pivotBlockNumber, pivotBlockHeader);
return Objects.hash(pivotBlockNumber, pivotBlockHash, pivotBlockHeader);
}
@Override
@@ -83,6 +101,8 @@ public class FastSyncState {
return "FastSyncState{"
+ "pivotBlockNumber="
+ pivotBlockNumber
+ "pivotBlockHash="
+ pivotBlockHash
+ ", pivotBlockHeader="
+ pivotBlockHeader
+ '}';

View File

@@ -0,0 +1,69 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import java.util.Optional;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PivotSelectorFromFinalizedBlock implements PivotBlockSelector {
private static final Logger LOG = LoggerFactory.getLogger(PivotSelectorFromFinalizedBlock.class);
private final GenesisConfigOptions genesisConfig;
private final Supplier<Optional<Hash>> finalizedBlockHashSupplier;
private final Runnable cleanupAction;
public PivotSelectorFromFinalizedBlock(
final GenesisConfigOptions genesisConfig,
final Supplier<Optional<Hash>> finalizedBlockHashSupplier,
final Runnable cleanupAction) {
this.genesisConfig = genesisConfig;
this.finalizedBlockHashSupplier = finalizedBlockHashSupplier;
this.cleanupAction = cleanupAction;
}
@Override
public Optional<FastSyncState> selectNewPivotBlock(final EthPeer peer) {
final Optional<Hash> maybeHash = finalizedBlockHashSupplier.get();
if (maybeHash.isPresent()) {
return Optional.of(selectLastFinalizedBlockAsPivot(maybeHash.get()));
}
LOG.info("No finalized block hash announced yet");
return Optional.empty();
}
private FastSyncState selectLastFinalizedBlockAsPivot(final Hash finalizedHash) {
LOG.info("Returning finalized block hash as pivot: {}", finalizedHash);
return new FastSyncState(finalizedHash);
}
@Override
public void close() {
cleanupAction.run();
}
@Override
public long getMinRequiredBlockNumber() {
return genesisConfig.getTerminalBlockNumber().orElse(0L);
}
}

View File

@@ -0,0 +1,53 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PivotSelectorFromPeers implements PivotBlockSelector {
private static final Logger LOG = LoggerFactory.getLogger(PivotSelectorFromPeers.class);
private final SynchronizerConfiguration syncConfig;
public PivotSelectorFromPeers(final SynchronizerConfiguration syncConfig) {
this.syncConfig = syncConfig;
}
@Override
public Optional<FastSyncState> selectNewPivotBlock(final EthPeer peer) {
return fromBestPeer(peer);
}
private Optional<FastSyncState> fromBestPeer(final EthPeer peer) {
final long pivotBlockNumber =
peer.chainState().getEstimatedHeight() - syncConfig.getFastSyncPivotDistance();
if (pivotBlockNumber <= BlockHeader.GENESIS_BLOCK_NUMBER) {
// Peer's chain isn't long enough, return an empty value so we can try again.
LOG.info("Waiting for peers with sufficient chain height");
return Optional.empty();
}
LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber);
return Optional.of(new FastSyncState(pivotBlockNumber));
}
}

View File

@@ -0,0 +1,95 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import java.util.Optional;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TransitionPivotSelector implements PivotBlockSelector {
private static final Logger LOG = LoggerFactory.getLogger(TransitionPivotSelector.class);
private final Difficulty totalTerminalDifficulty;
private final Supplier<Optional<Hash>> finalizedBlockHashSupplier;
private final PivotBlockSelector pivotSelectorFromPeers;
private final PivotBlockSelector pivotSelectorFromFinalizedBlock;
public TransitionPivotSelector(
final GenesisConfigOptions genesisConfig,
final Supplier<Optional<Hash>> finalizedBlockHashSupplier,
final PivotBlockSelector pivotSelectorFromPeers,
final PivotBlockSelector pivotSelectorFromFinalizedBlock) {
this.totalTerminalDifficulty =
genesisConfig
.getTerminalTotalDifficulty()
.map(Difficulty::of)
.orElseThrow(
() ->
new IllegalArgumentException(
"This class can only be used when TTD is present"));
this.finalizedBlockHashSupplier = finalizedBlockHashSupplier;
this.pivotSelectorFromPeers = pivotSelectorFromPeers;
this.pivotSelectorFromFinalizedBlock = pivotSelectorFromFinalizedBlock;
}
@Override
public Optional<FastSyncState> selectNewPivotBlock(final EthPeer peer) {
return routeDependingOnTotalTerminalDifficulty(peer);
}
private Optional<FastSyncState> routeDependingOnTotalTerminalDifficulty(final EthPeer peer) {
Difficulty bestPeerEstDifficulty = peer.chainState().getEstimatedTotalDifficulty();
if (finalizedBlockHashSupplier.get().isPresent()) {
LOG.info("A finalized block is present, use it as pivot");
return pivotSelectorFromFinalizedBlock.selectNewPivotBlock(peer);
}
if (bestPeerEstDifficulty.greaterOrEqualThan(totalTerminalDifficulty)) {
LOG.info(
"Chain has reached TTD, best peer has estimated difficulty {},"
+ " select pivot from finalized block",
bestPeerEstDifficulty);
return pivotSelectorFromFinalizedBlock.selectNewPivotBlock(peer);
}
LOG.info(
"Chain has not yet reached TTD, best peer has estimated difficulty {},"
+ " select pivot from peers",
bestPeerEstDifficulty);
return pivotSelectorFromPeers.selectNewPivotBlock(peer);
}
@Override
public void close() {
pivotSelectorFromFinalizedBlock.close();
pivotSelectorFromPeers.close();
}
@Override
public long getMinRequiredBlockNumber() {
return pivotSelectorFromFinalizedBlock.getMinRequiredBlockNumber();
}
}

View File

@@ -17,11 +17,12 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.RunnableCounter;
import org.hyperledger.besu.metrics.RunnableTimedCounter;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.services.tasks.Task;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import org.slf4j.Logger;
@@ -29,24 +30,23 @@ import org.slf4j.LoggerFactory;
public class CompleteTaskStep {
private static final Logger LOG = LoggerFactory.getLogger(CompleteTaskStep.class);
private static final int DISPLAY_PROGRESS_STEP = 100000;
private final RunnableCounter completedRequestsCounter;
private final RunnableTimedCounter completedRequestsCounter;
private final Counter retriedRequestsCounter;
private final LongSupplier worldStatePendingRequestsCurrentSupplier;
private long lastLogAt = System.currentTimeMillis();
public CompleteTaskStep(
final MetricsSystem metricsSystem,
final LongSupplier worldStatePendingRequestsCurrentSupplier) {
this.worldStatePendingRequestsCurrentSupplier = worldStatePendingRequestsCurrentSupplier;
completedRequestsCounter =
new RunnableCounter(
new RunnableTimedCounter(
metricsSystem.createCounter(
BesuMetricCategory.SYNCHRONIZER,
"world_state_completed_requests_total",
"Total number of node data requests completed as part of fast sync world state download"),
this::displayWorldStateSyncProgress,
DISPLAY_PROGRESS_STEP);
1,
TimeUnit.MINUTES);
retriedRequestsCounter =
metricsSystem.createCounter(
BesuMetricCategory.SYNCHRONIZER,
@@ -72,14 +72,10 @@ public class CompleteTaskStep {
}
private void displayWorldStateSyncProgress() {
final long now = System.currentTimeMillis();
if (now - lastLogAt > 10 * 1000L) {
LOG.info(
"Downloaded {} world state nodes. At least {} nodes remaining.",
getCompletedRequests(),
worldStatePendingRequestsCurrentSupplier.getAsLong());
lastLogAt = now;
}
LOG.info(
"Downloaded {} world state nodes. At least {} nodes remaining.",
getCompletedRequests(),
worldStatePendingRequestsCurrentSupplier.getAsLong());
}
long getCompletedRequests() {

View File

@@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions;
@@ -51,6 +52,7 @@ public class FastDownloaderFactory {
private static final Logger LOG = LoggerFactory.getLogger(FastDownloaderFactory.class);
public static Optional<FastSyncDownloader<?>> create(
final PivotBlockSelector pivotBlockSelector,
final SynchronizerConfiguration syncConfig,
final Path dataDirectory,
final ProtocolSchedule protocolSchedule,
@@ -118,6 +120,7 @@ public class FastDownloaderFactory {
protocolContext,
ethContext,
syncState,
pivotBlockSelector,
metricsSystem),
worldStateStorage,
worldStateDownloader,

View File

@@ -35,7 +35,6 @@ public class FullSyncDownloader {
private final SynchronizerConfiguration syncConfig;
private final ProtocolContext protocolContext;
private final SyncState syncState;
private final SyncTerminationCondition terminationCondition;
public FullSyncDownloader(
final SynchronizerConfiguration syncConfig,
@@ -48,7 +47,6 @@ public class FullSyncDownloader {
this.syncConfig = syncConfig;
this.protocolContext = protocolContext;
this.syncState = syncState;
this.terminationCondition = terminationCondition;
this.chainDownloader =
FullSyncChainDownloader.create(
@@ -63,15 +61,7 @@ public class FullSyncDownloader {
public CompletableFuture<Void> start() {
LOG.info("Starting full sync.");
return chainDownloader
.start()
.thenApply(
unused -> {
if (terminationCondition.shouldStopDownload()) {
syncState.setReachedTerminalDifficulty(true);
}
return null;
});
return chainDownloader.start();
}
public void stop() {

View File

@@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions;
@@ -45,6 +46,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
private static final Logger LOG = LoggerFactory.getLogger(SnapDownloaderFactory.class);
public static Optional<FastSyncDownloader<?>> createSnapDownloader(
final PivotBlockSelector pivotBlockSelector,
final SynchronizerConfiguration syncConfig,
final Path dataDirectory,
final ProtocolSchedule protocolSchedule,
@@ -108,6 +110,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
protocolContext,
ethContext,
syncState,
pivotBlockSelector,
metricsSystem),
worldStateStorage,
snapWorldStateDownloader,

View File

@@ -23,7 +23,10 @@ public class SnapSyncState extends FastSyncState {
private boolean isHealInProgress;
public SnapSyncState(final FastSyncState fastSyncState) {
super(fastSyncState.getPivotBlockNumber(), fastSyncState.getPivotBlockHeader());
super(
fastSyncState.getPivotBlockNumber(),
fastSyncState.getPivotBlockHash(),
fastSyncState.getPivotBlockHeader());
}
public boolean isHealInProgress() {

View File

@@ -49,10 +49,18 @@ public class SyncState {
private Optional<WorldStateDownloadStatus> worldStateDownloadStatus = Optional.empty();
private Optional<Long> newPeerListenerId;
private Optional<Boolean> reachedTerminalDifficulty = Optional.empty();
private volatile boolean isInitialSyncPhaseDone;
public SyncState(final Blockchain blockchain, final EthPeers ethPeers) {
this(blockchain, ethPeers, false);
}
public SyncState(
final Blockchain blockchain, final EthPeers ethPeers, final boolean hasInitialSyncPhase) {
this.blockchain = blockchain;
this.ethPeers = ethPeers;
isInitialSyncPhaseDone = !hasInitialSyncPhase;
blockchain.observeBlockAdded(
event -> {
if (event.isNewCanonicalHead()) {
@@ -157,7 +165,10 @@ public class SyncState {
}
public Optional<Boolean> hasReachedTerminalDifficulty() {
return reachedTerminalDifficulty;
if (isInitialSyncPhaseDone) {
return reachedTerminalDifficulty;
}
return Optional.of(Boolean.FALSE);
}
private boolean isInSync(
@@ -165,7 +176,8 @@ public class SyncState {
final Optional<ChainHeadEstimate> syncTargetChain,
final Optional<ChainHeadEstimate> bestPeerChain,
final long syncTolerance) {
return reachedTerminalDifficulty.orElse(true)
return isInitialSyncPhaseDone
&& reachedTerminalDifficulty.orElse(true)
// Sync target may be temporarily empty while we switch sync targets during a sync, so
// check both the sync target and our best peer to determine if we're in sync or not
&& isInSync(localChain, syncTargetChain, syncTolerance)
@@ -277,4 +289,12 @@ public class SyncState {
.forEach(
(syncTracker) -> syncTracker.checkState(localChain, syncTargetChain, bestPeerChain));
}
public void markInitialSyncPhaseAsDone() {
isInitialSyncPhaseDone = true;
}
public boolean isInitialSyncPhaseDone() {
return isInitialSyncPhaseDone;
}
}

View File

@@ -0,0 +1,90 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.tasks;
import static com.google.common.base.Preconditions.checkNotNull;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractGetHeadersFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import com.google.common.annotations.VisibleForTesting;
public class RetryingGetHeaderFromPeerByHashTask
extends AbstractRetryingPeerTask<List<BlockHeader>> {
private final Hash referenceHash;
private final ProtocolSchedule protocolSchedule;
private final long minimumRequiredBlockNumber;
@VisibleForTesting
RetryingGetHeaderFromPeerByHashTask(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash referenceHash,
final long minimumRequiredBlockNumber,
final MetricsSystem metricsSystem) {
super(ethContext, 3, List::isEmpty, metricsSystem);
this.protocolSchedule = protocolSchedule;
this.minimumRequiredBlockNumber = minimumRequiredBlockNumber;
checkNotNull(referenceHash);
this.referenceHash = referenceHash;
}
public static RetryingGetHeaderFromPeerByHashTask byHash(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash referenceHash,
final long minimumRequiredBlockNumber,
final MetricsSystem metricsSystem) {
return new RetryingGetHeaderFromPeerByHashTask(
protocolSchedule, ethContext, referenceHash, minimumRequiredBlockNumber, metricsSystem);
}
@Override
protected CompletableFuture<List<BlockHeader>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
final AbstractGetHeadersFromPeerTask task =
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule,
getEthContext(),
referenceHash,
minimumRequiredBlockNumber,
getMetricsSystem());
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
if (!peerResult.getResult().isEmpty()) {
result.complete(peerResult.getResult());
}
return peerResult.getResult();
});
}
public CompletableFuture<BlockHeader> getHeader() {
return run().thenApply(singletonList -> singletonList.get(0));
}
}

View File

@@ -144,7 +144,7 @@ public class BackwardSyncContextTest {
ethContext,
syncState,
backwardChain));
doReturn(true).when(context).isOnTTD();
doReturn(true).when(context).isReady();
doReturn(2).when(context).getBatchSize();
}
@@ -222,7 +222,8 @@ public class BackwardSyncContextTest {
@Test
public void shouldWaitWhenTTDNotReached()
throws ExecutionException, InterruptedException, TimeoutException {
doReturn(false).when(context).isOnTTD();
doReturn(false).when(context).isReady();
when(syncState.isInitialSyncPhaseDone()).thenReturn(Boolean.TRUE);
when(syncState.subscribeTTDReached(any())).thenReturn(88L);
final CompletableFuture<Void> voidCompletableFuture = context.waitForTTD();
@@ -241,7 +242,7 @@ public class BackwardSyncContextTest {
@Test
public void shouldNotWaitWhenTTDReached()
throws ExecutionException, InterruptedException, TimeoutException {
doReturn(true).when(context).isOnTTD();
doReturn(true).when(context).isReady();
when(syncState.subscribeTTDReached(any())).thenReturn(88L);
final CompletableFuture<Void> voidCompletableFuture = context.waitForTTD();
voidCompletableFuture.get(1, TimeUnit.SECONDS);
@@ -261,7 +262,7 @@ public class BackwardSyncContextTest {
}
@Test
public void shouldFinishWhenWorkIsDonw() {
public void shouldFinishWhenWorkIsDone() {
final CompletableFuture<Void> completableFuture = context.executeNextStep(null);
assertThat(completableFuture.isDone()).isTrue();

View File

@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastDownloaderFactory;
@@ -58,14 +59,16 @@ public class FastDownloaderFactoryTest {
@Mock private SyncState syncState;
@Mock private Clock clock;
@Mock private Path dataDirectory;
@Mock private PivotBlockSelector pivotBlockSelector;
@SuppressWarnings("unchecked")
@Test(expected = IllegalStateException.class)
public void shouldThrowIfSyncModeChangedWhileFastSyncIncomplete() throws NoSuchFieldException {
public void shouldThrowIfSyncModeChangedWhileFastSyncIncomplete() {
initDataDirectory(true);
when(syncConfig.getSyncMode()).thenReturn(SyncMode.FULL);
FastDownloaderFactory.create(
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
@@ -79,12 +82,13 @@ public class FastDownloaderFactoryTest {
@SuppressWarnings({"unchecked", "rawtypes"})
@Test
public void shouldNotThrowIfSyncModeChangedWhileFastSyncComplete() throws NoSuchFieldException {
public void shouldNotThrowIfSyncModeChangedWhileFastSyncComplete() {
initDataDirectory(false);
when(syncConfig.getSyncMode()).thenReturn(SyncMode.FULL);
final Optional result =
FastDownloaderFactory.create(
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
@@ -108,6 +112,7 @@ public class FastDownloaderFactoryTest {
when(syncConfig.getSyncMode()).thenReturn(SyncMode.FAST);
FastDownloaderFactory.create(
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
@@ -137,6 +142,7 @@ public class FastDownloaderFactoryTest {
assertThat(Files.exists(stateQueueDir)).isTrue();
FastDownloaderFactory.create(
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
@@ -168,6 +174,7 @@ public class FastDownloaderFactoryTest {
Assertions.assertThatThrownBy(
() ->
FastDownloaderFactory.create(
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,

View File

@@ -19,6 +19,8 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@@ -33,6 +35,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
@@ -45,6 +48,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@@ -92,8 +96,7 @@ public class FastSyncActionsTest {
blockchainSetupUtil.getWorldArchive(),
blockchainSetupUtil.getTransactionPool(),
EthProtocolConfiguration.defaultConfig());
fastSyncActions = createFastSyncActions(syncConfig);
when(worldStateStorage.isWorldStateAvailable(any(), any())).thenReturn(true);
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
}
@Test
@@ -136,7 +139,7 @@ public class FastSyncActionsTest {
final int minPeers = 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000);
@@ -151,7 +154,7 @@ public class FastSyncActionsTest {
final int minPeers = 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(1000), 5500);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(2000), 4000);
@@ -168,7 +171,7 @@ public class FastSyncActionsTest {
final int minPeers = 2;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
final CompletableFuture<FastSyncState> result =
fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
@@ -193,7 +196,7 @@ public class FastSyncActionsTest {
final int minPeers = 3;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L;
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
@@ -238,7 +241,7 @@ public class FastSyncActionsTest {
final PeerValidator validator = mock(PeerValidator.class);
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L;
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
@@ -299,7 +302,7 @@ public class FastSyncActionsTest {
final int peerCount = minPeers + 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L;
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
@@ -345,7 +348,7 @@ public class FastSyncActionsTest {
final int minPeers = 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig);
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
final long pivotDistance = syncConfig.getFastSyncPivotDistance();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
@@ -396,7 +399,7 @@ public class FastSyncActionsTest {
@Test
public void downloadPivotBlockHeaderShouldRetrievePivotBlockHeader() {
syncConfig = SynchronizerConfiguration.builder().fastSyncMinimumPeerCount(1).build();
fastSyncActions = createFastSyncActions(syncConfig);
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1001);
final CompletableFuture<FastSyncState> result =
@@ -409,7 +412,32 @@ public class FastSyncActionsTest {
assertThat(result).isCompletedWithValue(new FastSyncState(blockchain.getBlockHeader(1).get()));
}
private FastSyncActions createFastSyncActions(final SynchronizerConfiguration syncConfig) {
@Test
public void downloadPivotBlockHeaderShouldRetrievePivotBlockHash() {
syncConfig = SynchronizerConfiguration.builder().fastSyncMinimumPeerCount(1).build();
GenesisConfigOptions genesisConfig = mock(GenesisConfigOptions.class);
when(genesisConfig.getTerminalBlockNumber()).thenReturn(OptionalLong.of(10L));
final Optional<Hash> finalizedHash = blockchain.getBlockHashByNumber(2L);
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromFinalizedBlock(genesisConfig, () -> finalizedHash, () -> {}));
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1001);
final CompletableFuture<FastSyncState> result =
fastSyncActions.downloadPivotBlockHeader(new FastSyncState(finalizedHash.get()));
assertThat(result).isNotCompleted();
final RespondingEthPeer.Responder responder = RespondingEthPeer.blockchainResponder(blockchain);
peer.respond(responder);
assertThat(result).isCompletedWithValue(new FastSyncState(blockchain.getBlockHeader(2).get()));
}
private FastSyncActions createFastSyncActions(
final SynchronizerConfiguration syncConfig, final PivotBlockSelector pivotBlockSelector) {
final ProtocolSchedule protocolSchedule = blockchainSetupUtil.getProtocolSchedule();
final ProtocolContext protocolContext = blockchainSetupUtil.getProtocolContext();
final EthContext ethContext = ethProtocolManager.ethContext();
@@ -419,7 +447,8 @@ public class FastSyncActionsTest {
protocolSchedule,
protocolContext,
ethContext,
new SyncState(blockchain, ethContext.getEthPeers()),
new SyncState(blockchain, ethContext.getEthPeers(), true),
pivotBlockSelector,
new NoOpMetricsSystem());
}
}

View File

@@ -0,0 +1,69 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.metrics;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/** Counter that triggers a specific task if the specified interval has elapsed. */
public class RunnableTimedCounter implements Counter {
private final Counter backedCounter;
private final Runnable task;
private final long intervalMillis;
private final AtomicLong stepCounter;
private volatile long nextExecutionAtMillis;
public RunnableTimedCounter(
final Counter backedCounter, final Runnable task, final long interval, final TimeUnit unit) {
this.backedCounter = backedCounter;
this.task = task;
this.stepCounter = new AtomicLong(0);
this.intervalMillis = unit.toMillis(interval);
this.nextExecutionAtMillis = System.currentTimeMillis() + intervalMillis;
}
/**
* Increments the stepCounter by 1
*
* <p>{@link #inc(long) inc} method
*/
@Override
public void inc() {
this.inc(1);
}
/**
* Increments the stepCounter by amount. Triggers the runnable if interval has elapsed
*
* @param amount the value to add to the stepCounter.
*/
@Override
public void inc(final long amount) {
backedCounter.inc(amount);
stepCounter.addAndGet(amount);
final long now = System.currentTimeMillis();
if (nextExecutionAtMillis < now) {
task.run();
nextExecutionAtMillis = now + intervalMillis;
}
}
public long get() {
return stepCounter.get();
}
}

View File

@@ -0,0 +1,61 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.metrics;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class RunnableTimedCounterTest {
@Mock Counter backedCounter;
@Test
public void shouldNotRunTaskIfIntervalNotElapsed() {
RunnableTimedCounter rtc =
new RunnableTimedCounter(
backedCounter, () -> fail("Must not be called"), 1L, TimeUnit.MINUTES);
rtc.inc();
verify(backedCounter).inc(1L);
}
@Test
public void shouldRunTaskIfIntervalElapsed() throws InterruptedException {
Runnable task = mock(Runnable.class);
RunnableTimedCounter rtc =
new RunnableTimedCounter(backedCounter, task, 1L, TimeUnit.MICROSECONDS);
Thread.sleep(1L);
rtc.inc();
verify(backedCounter).inc(1L);
verify(task).run();
}
}