From e9b9de68444d08f6ddde31d6f51720ab92271408 Mon Sep 17 00:00:00 2001 From: matkt Date: Mon, 4 Apr 2022 15:22:48 +0200 Subject: [PATCH] Add snapsync pipeline (#3656) Signed-off-by: Karim TAAM --- .../besu/ethereum/core/TrieGenerator.java | 5 +- .../messages/snap/StorageRangeMessage.java | 9 +- .../eth/sync/DefaultSynchronizer.java | 4 +- .../besu/ethereum/eth/sync/SyncMode.java | 4 +- .../eth/sync/fastsync/FastSyncActions.java | 9 + .../fastsync/FastSyncChainDownloader.java | 10 +- .../eth/sync/fastsync/FastSyncDownloader.java | 33 +- .../sync/fastsync/FastSyncTargetManager.java | 8 +- .../FastDownloaderFactory.java | 19 +- .../eth/sync/snapsync/CompleteTaskStep.java | 57 +++ .../snapsync/DynamicPivotBlockManager.java | 99 +++++ .../eth/sync/snapsync/LoadLocalDataStep.java | 74 ++++ .../eth/sync/snapsync/PersistDataStep.java | 81 ++++ .../eth/sync/snapsync/RequestDataStep.java | 210 ++++++++++ .../sync/snapsync/SnapDownloaderFactory.java | 135 ++++++ .../eth/sync/snapsync/SnapSyncDownloader.java | 66 +++ .../SnapWorldStateDownloadProcess.java | 388 ++++++++++++++++++ .../snapsync/SnapWorldStateDownloader.java | 188 +++++++++ .../request/AccountRangeDataRequest.java | 4 +- .../request/AccountTrieNodeDataRequest.java | 2 +- .../snapsync/request/SnapDataRequest.java | 18 +- .../request/StorageRangeDataRequest.java | 33 +- .../sync/worldstate/TaskQueueIterator.java | 13 +- .../snap/StorageRangeMessageTest.java | 7 +- .../fastsync/FastDownloaderFactoryTest.java | 1 + .../sync/fastsync/FastSyncActionsTest.java | 4 + .../fastsync/FastSyncChainDownloaderTest.java | 5 + .../sync/fastsync/FastSyncDownloaderTest.java | 14 +- .../worldstate/CompleteTaskStepTest.java | 1 - .../worldstate/LoadLocalDataStepTest.java | 1 - .../worldstate/PersistDataStepTest.java | 1 - .../worldstate/RequestDataStepTest.java | 1 - .../{ => fastsync}/worldstate/StubTask.java | 3 +- .../sync/snapsync/CompleteTaskStepTest.java | 149 +++++++ .../DynamicPivotBlockManagerTest.java | 123 ++++++ .../sync/snapsync/LoadLocalDataStepTest.java | 102 +++++ .../sync/snapsync/PersistDataStepTest.java | 108 +++++ .../ethereum/eth/sync/snapsync/StubTask.java | 52 +++ .../eth/sync/snapsync/TaskGenerator.java | 144 +++++++ .../services/pipeline/BatchingReadPipe.java | 35 +- .../besu/services/pipeline/Pipe.java | 3 +- .../services/pipeline/PipelineBuilder.java | 30 ++ .../besu/services/pipeline/ReadPipe.java | 3 +- 43 files changed, 2177 insertions(+), 79 deletions(-) rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/{ => worldstate}/FastDownloaderFactory.java (90%) create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStep.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockManager.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapSyncDownloader.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java rename ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/StubTask.java (91%) create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStepTest.java create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockManagerTest.java create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStepTest.java create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStepTest.java create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StubTask.java create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/TaskGenerator.java diff --git a/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/TrieGenerator.java b/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/TrieGenerator.java index ea44362f0..63ef3d692 100644 --- a/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/TrieGenerator.java +++ b/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/TrieGenerator.java @@ -51,12 +51,13 @@ public class TrieGenerator { (location, hash, value) -> updater.putAccountStorageTrieNode( accountHash.get(accountIndex), location, hash, value)); - final Hash codeHash = Hash.hash(Bytes32.leftPad(Bytes.of(i + 10))); + final Bytes code = Bytes32.leftPad(Bytes.of(i + 10)); + final Hash codeHash = Hash.hash(code); final StateTrieAccountValue accountValue = new StateTrieAccountValue(1L, Wei.of(2L), Hash.wrap(storageTrie.getRootHash()), codeHash); accountStateTrie.put(accountHash.get(i), RLP.encode(accountValue::writeTo)); accountStateTrie.commit(updater::putAccountStateTrieNode); - + updater.putCode(codeHash, code); // Persist updates updater.commit(); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/StorageRangeMessage.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/StorageRangeMessage.java index 49d39c9ee..ab4a353fe 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/StorageRangeMessage.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/StorageRangeMessage.java @@ -22,7 +22,6 @@ import org.hyperledger.besu.ethereum.rlp.RLPInput; import java.math.BigInteger; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.TreeMap; @@ -50,13 +49,13 @@ public final class StorageRangeMessage extends AbstractSnapMessageData { } public static StorageRangeMessage create( - final ArrayDeque> slots, final List proof) { + final ArrayDeque> slots, final List proof) { return create(Optional.empty(), slots, proof); } public static StorageRangeMessage create( final Optional requestId, - final ArrayDeque> slots, + final ArrayDeque> slots, final List proof) { final BytesValueRLPOutput tmp = new BytesValueRLPOutput(); tmp.startList(); @@ -89,7 +88,7 @@ public final class StorageRangeMessage extends AbstractSnapMessageData { } public SlotRangeData slotsData(final boolean withRequestId) { - final ArrayDeque> slots = new ArrayDeque<>(); + final ArrayDeque> slots = new ArrayDeque<>(); final ArrayDeque proofs = new ArrayDeque<>(); final RLPInput input = new BytesValueRLPInput(data, false); input.enterList(); @@ -121,7 +120,7 @@ public final class StorageRangeMessage extends AbstractSnapMessageData { @Value.Immutable public interface SlotRangeData { - ArrayDeque> slots(); + ArrayDeque> slots(); ArrayDeque proofs(); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index eaa6bb8ef..401315771 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -19,9 +19,9 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.Synchronizer; import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastDownloaderFactory; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastDownloaderFactory; import org.hyperledger.besu.ethereum.eth.sync.fullsync.FullSyncDownloader; import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition; import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager; @@ -51,7 +51,7 @@ public class DefaultSynchronizer implements Synchronizer { private final SyncState syncState; private final AtomicBoolean running = new AtomicBoolean(false); private final BlockPropagationManager blockPropagationManager; - private final Optional fastSyncDownloader; + private final Optional> fastSyncDownloader; private final FullSyncDownloader fullSyncDownloader; private final ProtocolContext protocolContext; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SyncMode.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SyncMode.java index f61eacf3d..cbba5aabe 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SyncMode.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SyncMode.java @@ -18,7 +18,9 @@ public enum SyncMode { // Fully validate all blocks as they sync FULL, // Perform light validation on older blocks, and switch to full validation for more recent blocks - FAST; + FAST, + // Perform snapsync + X_SNAP; public static SyncMode fromString(final String str) { for (final SyncMode mode : SyncMode.values()) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java index 1183617f2..82869f044 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java @@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.metrics.Counter; @@ -44,6 +45,7 @@ public class FastSyncActions { private static final Logger LOG = LoggerFactory.getLogger(FastSyncActions.class); private final SynchronizerConfiguration syncConfig; + private final WorldStateStorage worldStateStorage; private final ProtocolSchedule protocolSchedule; private final ProtocolContext protocolContext; private final EthContext ethContext; @@ -54,12 +56,14 @@ public class FastSyncActions { public FastSyncActions( final SynchronizerConfiguration syncConfig, + final WorldStateStorage worldStateStorage, final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, final SyncState syncState, final MetricsSystem metricsSystem) { this.syncConfig = syncConfig; + this.worldStateStorage = worldStateStorage; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; @@ -78,6 +82,10 @@ public class FastSyncActions { pivotBlockGauge::get); } + public SyncState getSyncState() { + return syncState; + } + public CompletableFuture waitForSuitablePeers(final FastSyncState fastSyncState) { if (fastSyncState.hasPivotBlockHeader()) { return waitForAnyPeer().thenApply(ignore -> fastSyncState); @@ -193,6 +201,7 @@ public class FastSyncActions { public ChainDownloader createChainDownloader(final FastSyncState currentState) { return FastSyncChainDownloader.create( syncConfig, + worldStateStorage, protocolSchedule, protocolContext, ethContext, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java index bc6f9f1c7..49b20123e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java @@ -21,6 +21,7 @@ import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.plugin.services.MetricsSystem; public class FastSyncChainDownloader { @@ -29,6 +30,7 @@ public class FastSyncChainDownloader { public static ChainDownloader create( final SynchronizerConfiguration config, + final WorldStateStorage worldStateStorage, final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, @@ -38,7 +40,13 @@ public class FastSyncChainDownloader { final FastSyncTargetManager syncTargetManager = new FastSyncTargetManager( - config, protocolSchedule, protocolContext, ethContext, metricsSystem, fastSyncState); + config, + worldStateStorage, + protocolSchedule, + protocolContext, + ethContext, + metricsSystem, + fastSyncState); return new PipelineChainDownloader( syncState, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java index 100f89c30..eccc5f52a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java @@ -19,7 +19,6 @@ import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose; import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements; -import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.NodeDataRequest; import org.hyperledger.besu.ethereum.eth.sync.worldstate.StalledDownloadException; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; @@ -33,33 +32,35 @@ import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import com.google.common.io.MoreFiles; import com.google.common.io.RecursiveDeleteOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FastSyncDownloader { +public class FastSyncDownloader { private static final Duration FAST_SYNC_RETRY_DELAY = Duration.ofSeconds(5); private static final Logger LOG = LoggerFactory.getLogger(FastSyncDownloader.class); - private final FastSyncActions fastSyncActions; private final WorldStateStorage worldStateStorage; private final WorldStateDownloader worldStateDownloader; - private final FastSyncStateStorage fastSyncStateStorage; - private final TaskCollection taskCollection; + private final TaskCollection taskCollection; private final Path fastSyncDataDirectory; - private final FastSyncState initialFastSyncState; private volatile Optional trailingPeerRequirements = Optional.empty(); private final AtomicBoolean running = new AtomicBoolean(false); + protected final FastSyncActions fastSyncActions; + protected final FastSyncStateStorage fastSyncStateStorage; + protected FastSyncState initialFastSyncState; + public FastSyncDownloader( final FastSyncActions fastSyncActions, final WorldStateStorage worldStateStorage, final WorldStateDownloader worldStateDownloader, final FastSyncStateStorage fastSyncStateStorage, - final TaskCollection taskCollection, + final TaskCollection taskCollection, final Path fastSyncDataDirectory, final FastSyncState initialFastSyncState) { this.fastSyncActions = fastSyncActions; @@ -78,11 +79,17 @@ public class FastSyncDownloader { return start(initialFastSyncState); } - private CompletableFuture start(final FastSyncState fastSyncState) { + protected CompletableFuture start(final FastSyncState fastSyncState) { LOG.info("Starting fast sync."); if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) { worldStateStorage.clear(); } + return findPivotBlock(fastSyncState, fss -> downloadChainAndWorldState(fastSyncActions, fss)); + } + + public CompletableFuture findPivotBlock( + final FastSyncState fastSyncState, + final Function> onNewPivotBlock) { return exceptionallyCompose( fastSyncActions .waitForSuitablePeers(fastSyncState) @@ -90,11 +97,11 @@ public class FastSyncDownloader { .thenCompose(fastSyncActions::downloadPivotBlockHeader) .thenApply(this::updateMaxTrailingPeers) .thenApply(this::storeState) - .thenCompose(fss -> downloadChainAndWorldState(fastSyncActions, fss)), + .thenCompose(onNewPivotBlock), this::handleFailure); } - private CompletableFuture handleFailure(final Throwable error) { + protected CompletableFuture handleFailure(final Throwable error) { trailingPeerRequirements = Optional.empty(); Throwable rootCause = ExceptionUtils.rootCause(error); if (rootCause instanceof FastSyncException) { @@ -139,7 +146,7 @@ public class FastSyncDownloader { } } - private FastSyncState updateMaxTrailingPeers(final FastSyncState state) { + protected FastSyncState updateMaxTrailingPeers(final FastSyncState state) { if (state.getPivotBlockNumber().isPresent()) { trailingPeerRequirements = Optional.of(new TrailingPeerRequirements(state.getPivotBlockNumber().getAsLong(), 0)); @@ -149,12 +156,12 @@ public class FastSyncDownloader { return state; } - private FastSyncState storeState(final FastSyncState state) { + protected FastSyncState storeState(final FastSyncState state) { fastSyncStateStorage.storeState(state); return state; } - private CompletableFuture downloadChainAndWorldState( + protected CompletableFuture downloadChainAndWorldState( final FastSyncActions fastSyncActions, final FastSyncState currentState) { // Synchronized ensures that stop isn't called while we're in the process of starting a // world state and chain download. If it did we might wind up starting a new download diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncTargetManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncTargetManager.java index 30b0ee18d..1700fb476 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncTargetManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncTargetManager.java @@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByNumberTask; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; @@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory; class FastSyncTargetManager extends SyncTargetManager { private static final Logger LOG = LoggerFactory.getLogger(FastSyncTargetManager.class); + private final WorldStateStorage worldStateStorage; private final ProtocolSchedule protocolSchedule; private final ProtocolContext protocolContext; private final EthContext ethContext; @@ -46,12 +48,14 @@ class FastSyncTargetManager extends SyncTargetManager { public FastSyncTargetManager( final SynchronizerConfiguration config, + final WorldStateStorage worldStateStorage, final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, final MetricsSystem metricsSystem, final FastSyncState fastSyncState) { super(config, protocolSchedule, protocolContext, ethContext, metricsSystem); + this.worldStateStorage = worldStateStorage; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; @@ -124,6 +128,8 @@ class FastSyncTargetManager extends SyncTargetManager { @Override public boolean shouldContinueDownloading() { final BlockHeader pivotBlockHeader = fastSyncState.getPivotBlockHeader().get(); - return !protocolContext.getBlockchain().getChainHeadHash().equals(pivotBlockHeader.getHash()); + return !protocolContext.getBlockchain().getChainHeadHash().equals(pivotBlockHeader.getHash()) + || !worldStateStorage.isWorldStateAvailable( + pivotBlockHeader.getStateRoot(), pivotBlockHeader.getBlockHash()); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java similarity index 90% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactory.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java index a5f731bfb..2996bed04 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.fastsync; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage; @@ -20,8 +20,10 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; -import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastWorldStateDownloader; -import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.NodeDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncStateStorage; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; @@ -44,11 +46,11 @@ import org.slf4j.LoggerFactory; public class FastDownloaderFactory { - private static final String FAST_SYNC_FOLDER = "fastsync"; + protected static final String FAST_SYNC_FOLDER = "fastsync"; private static final Logger LOG = LoggerFactory.getLogger(FastDownloaderFactory.class); - public static Optional create( + public static Optional> create( final SynchronizerConfiguration syncConfig, final Path dataDirectory, final ProtocolSchedule protocolSchedule, @@ -107,10 +109,11 @@ public class FastDownloaderFactory { syncConfig.getWorldStateMinMillisBeforeStalling(), clock, metricsSystem); - final FastSyncDownloader fastSyncDownloader = - new FastSyncDownloader( + final FastSyncDownloader fastSyncDownloader = + new FastSyncDownloader<>( new FastSyncActions( syncConfig, + worldStateStorage, protocolSchedule, protocolContext, ethContext, @@ -145,7 +148,7 @@ public class FastDownloaderFactory { } } - private static void ensureDirectoryExists(final File dir) { + protected static void ensureDirectoryExists(final File dir) { if (!dir.mkdirs() && !dir.isDirectory()) { throw new IllegalStateException("Unable to create directory: " + dir.getAbsolutePath()); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStep.java new file mode 100644 index 000000000..33c92f64a --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStep.java @@ -0,0 +1,57 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.TrieNodeDataRequest; +import org.hyperledger.besu.metrics.BesuMetricCategory; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.metrics.Counter; +import org.hyperledger.besu.services.tasks.Task; + +public class CompleteTaskStep { + private final SnapSyncState snapSyncState; + private final Counter completedRequestsCounter; + private final Counter retriedRequestsCounter; + + public CompleteTaskStep(final SnapSyncState snapSyncState, final MetricsSystem metricsSystem) { + this.snapSyncState = snapSyncState; + completedRequestsCounter = + metricsSystem.createCounter( + BesuMetricCategory.SYNCHRONIZER, + "snap_world_state_completed_requests_total", + "Total number of node data requests completed as part of snap sync world state download"); + retriedRequestsCounter = + metricsSystem.createCounter( + BesuMetricCategory.SYNCHRONIZER, + "snap_world_state_retried_requests_total", + "Total number of node data requests repeated as part of snap sync world state download"); + } + + public synchronized void markAsCompleteOrFailed( + final SnapWorldDownloadState downloadState, final Task task) { + if (task.getData().isValid() + || (task.getData() instanceof TrieNodeDataRequest + && task.getData().isExpired(snapSyncState))) { + completedRequestsCounter.inc(); + task.markCompleted(); + downloadState.checkCompletion(snapSyncState.getPivotBlockHeader().orElseThrow()); + } else { + retriedRequestsCounter.inc(); + task.markFailed(); + } + downloadState.notifyTaskAvailable(); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockManager.java new file mode 100644 index 000000000..e7e7a07d6 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockManager.java @@ -0,0 +1,99 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState; +import org.hyperledger.besu.services.tasks.TasksPriorityProvider; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamicPivotBlockManager { + + private static final Logger LOG = LoggerFactory.getLogger(DynamicPivotBlockManager.class); + + private final AtomicBoolean isSearchingPivotBlock = new AtomicBoolean(false); + private final AtomicBoolean isUpdatingPivotBlock = new AtomicBoolean(false); + + private final WorldDownloadState worldDownloadState; + + private final FastSyncActions syncActions; + + private final FastSyncState syncState; + + private Optional lastBlockFound; + + public DynamicPivotBlockManager( + final WorldDownloadState worldDownloadState, + final FastSyncActions fastSyncActions, + final SnapSyncState fastSyncState) { + this.worldDownloadState = worldDownloadState; + this.syncActions = fastSyncActions; + this.syncState = fastSyncState; + this.lastBlockFound = Optional.empty(); + } + + public void check(final Consumer onNewPivotBlock) { + syncState + .getPivotBlockNumber() + .ifPresent( + blockNumber -> { + final long currentPivotBlockNumber = syncState.getPivotBlockNumber().orElseThrow(); + final long distanceNextPivotBlock = + syncActions.getSyncState().bestChainHeight() + - lastBlockFound + .map(ProcessableBlockHeader::getNumber) + .orElse(currentPivotBlockNumber); + if (distanceNextPivotBlock > 60 && isSearchingPivotBlock.compareAndSet(false, true)) { + syncActions + .waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE) + .thenCompose(syncActions::selectPivotBlock) + .thenCompose(syncActions::downloadPivotBlockHeader) + .thenAccept(fss -> lastBlockFound = fss.getPivotBlockHeader()) + .orTimeout(5, TimeUnit.MINUTES) + .whenComplete((unused, throwable) -> isSearchingPivotBlock.set(false)); + } + + final long distance = + syncActions.getSyncState().bestChainHeight() - currentPivotBlockNumber; + if (distance > 126 && isUpdatingPivotBlock.compareAndSet(false, true)) { + switchToNewPivotBlock(onNewPivotBlock); + isUpdatingPivotBlock.set(false); + } + }); + } + + private void switchToNewPivotBlock(final Consumer onNewPivotBlock) { + lastBlockFound.ifPresent( + blockHeader -> { + LOG.info( + "Select new pivot block {} {}", blockHeader.getNumber(), blockHeader.getStateRoot()); + syncState.setCurrentHeader(blockHeader); + onNewPivotBlock.accept(blockHeader); + worldDownloadState.requestComplete(true); + worldDownloadState.notifyTaskAvailable(); + }); + lastBlockFound = Optional.empty(); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java new file mode 100644 index 000000000..d148fea5d --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java @@ -0,0 +1,74 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.TrieNodeDataRequest; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.metrics.BesuMetricCategory; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.metrics.Counter; +import org.hyperledger.besu.services.pipeline.Pipe; +import org.hyperledger.besu.services.tasks.Task; + +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.tuweni.bytes.Bytes; + +public class LoadLocalDataStep { + + private final WorldStateStorage worldStateStorage; + private final SnapWorldDownloadState downloadState; + private final SnapSyncState snapSyncState; + private final Counter existingNodeCounter; + + public LoadLocalDataStep( + final WorldStateStorage worldStateStorage, + final SnapWorldDownloadState downloadState, + final MetricsSystem metricsSystem, + final SnapSyncState snapSyncState) { + this.worldStateStorage = worldStateStorage; + this.downloadState = downloadState; + existingNodeCounter = + metricsSystem.createCounter( + BesuMetricCategory.SYNCHRONIZER, + "snap_world_state_existing_nodes_total", + "Total number of node data requests completed using existing data"); + this.snapSyncState = snapSyncState; + } + + public Stream> loadLocalDataTrieNode( + final Task task, final Pipe> completedTasks) { + final TrieNodeDataRequest request = (TrieNodeDataRequest) task.getData(); + // check if node is already stored in the worldstate + if (snapSyncState.hasPivotBlockHeader()) { + Optional existingData = request.getExistingData(worldStateStorage); + if (existingData.isPresent()) { + existingNodeCounter.inc(); + request.setData(existingData.get()); + request.setRequiresPersisting(false); + request.setRootHash(snapSyncState.getPivotBlockHeader().get().getStateRoot()); + final WorldStateStorage.Updater updater = worldStateStorage.updater(); + request.persist(worldStateStorage, updater, downloadState, snapSyncState); + updater.commit(); + downloadState.enqueueRequests(request.getRootStorageRequests(worldStateStorage)); + completedTasks.put(task); + return Stream.empty(); + } + } + return Stream.of(task); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java new file mode 100644 index 000000000..8b18befcd --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java @@ -0,0 +1,81 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.TrieNodeDataRequest; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.services.tasks.Task; + +import java.util.List; +import java.util.stream.Stream; + +public class PersistDataStep { + + private final SnapSyncState snapSyncState; + private final WorldStateStorage worldStateStorage; + private final SnapWorldDownloadState downloadState; + + public PersistDataStep( + final SnapSyncState snapSyncState, + final WorldStateStorage worldStateStorage, + final SnapWorldDownloadState downloadState) { + this.snapSyncState = snapSyncState; + this.worldStateStorage = worldStateStorage; + this.downloadState = downloadState; + } + + public List> persist(final List> tasks) { + final WorldStateStorage.Updater updater = worldStateStorage.updater(); + for (Task task : tasks) { + if (task.getData().isValid()) { + // enqueue child requests + final Stream childRequests = + task.getData().getChildRequests(downloadState, worldStateStorage, snapSyncState); + if (!(task.getData() instanceof TrieNodeDataRequest)) { + enqueueChildren(childRequests); + } else { + if (!task.getData().isExpired(snapSyncState)) { + enqueueChildren(childRequests); + } else if (childRequests.findAny().isPresent()) { + // not saved because it's expired and missing child requests + return tasks; + } + } + + // persist nodes + final int persistedNodes = + task.getData().persist(worldStateStorage, updater, downloadState, snapSyncState); + if (persistedNodes > 0) { + if (task.getData() instanceof TrieNodeDataRequest) { + downloadState.getHealedNodes().inc(persistedNodes); + } else { + downloadState.getGeneratedNodes().inc(persistedNodes); + } + } + } + } + updater.commit(); + return tasks; + } + + public Task persist(final Task task) { + return persist(List.of(task)).get(0); + } + + private void enqueueChildren(final Stream childRequests) { + downloadState.enqueueRequests(childRequests); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java new file mode 100644 index 000000000..b16cd9178 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java @@ -0,0 +1,210 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.snap.RetryingGetAccountRangeFromPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.snap.RetryingGetBytecodeFromPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.snap.RetryingGetStorageRangeFromPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.snap.RetryingGetTrieNodeFromPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; +import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage; +import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.StorageRangeDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.TrieNodeDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState; +import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.services.tasks.Task; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import com.google.common.collect.Lists; +import kotlin.collections.ArrayDeque; +import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes32; + +public class RequestDataStep { + + private final SnapSyncState fastSyncState; + private final WorldDownloadState downloadState; + private final MetricsSystem metricsSystem; + private final EthContext ethContext; + private final WorldStateProofProvider worldStateProofProvider; + + public RequestDataStep( + final EthContext ethContext, + final WorldStateStorage worldStateStorage, + final SnapSyncState fastSyncState, + final WorldDownloadState downloadState, + final MetricsSystem metricsSystem) { + this.fastSyncState = fastSyncState; + this.downloadState = downloadState; + this.metricsSystem = metricsSystem; + this.ethContext = ethContext; + this.worldStateProofProvider = new WorldStateProofProvider(worldStateStorage); + } + + public CompletableFuture> requestAccount( + final Task requestTask) { + + final BlockHeader blockHeader = fastSyncState.getPivotBlockHeader().get(); + final AccountRangeDataRequest accountDataRequest = + (AccountRangeDataRequest) requestTask.getData(); + final EthTask getAccountTask = + RetryingGetAccountRangeFromPeerTask.forAccountRange( + ethContext, + accountDataRequest.getStartKeyHash(), + accountDataRequest.getEndKeyHash(), + blockHeader, + metricsSystem); + downloadState.addOutstandingTask(getAccountTask); + return getAccountTask + .run() + .handle( + (response, error) -> { + if (response != null) { + downloadState.removeOutstandingTask(getAccountTask); + accountDataRequest.setRootHash(blockHeader.getStateRoot()); + accountDataRequest.setAccounts(response.accounts()); + accountDataRequest.setProofs(response.proofs()); + accountDataRequest.checkProof( + downloadState, worldStateProofProvider, fastSyncState); + } + return requestTask; + }); + } + + public CompletableFuture>> requestStorage( + final List> requestTasks) { + final List accountHashes = + requestTasks.stream() + .map(Task::getData) + .map(StorageRangeDataRequest.class::cast) + .map(StorageRangeDataRequest::getAccountHash) + .collect(Collectors.toList()); + final BlockHeader blockHeader = fastSyncState.getPivotBlockHeader().get(); + final Bytes32 minRange = + requestTasks.size() == 1 + ? ((StorageRangeDataRequest) requestTasks.get(0).getData()).getStartKeyHash() + : RangeManager.MIN_RANGE; + final Bytes32 maxRange = + requestTasks.size() == 1 + ? ((StorageRangeDataRequest) requestTasks.get(0).getData()).getEndKeyHash() + : RangeManager.MAX_RANGE; + final EthTask getStorageRangeTask = + RetryingGetStorageRangeFromPeerTask.forStorageRange( + ethContext, accountHashes, minRange, maxRange, blockHeader, metricsSystem); + downloadState.addOutstandingTask(getStorageRangeTask); + return getStorageRangeTask + .run() + .handle( + (response, error) -> { + if (response != null) { + downloadState.removeOutstandingTask(getStorageRangeTask); + for (int i = 0; i < response.slots().size(); i++) { + final StorageRangeDataRequest request = + (StorageRangeDataRequest) requestTasks.get(i).getData(); + request.setRootHash(blockHeader.getStateRoot()); + request.setSlots(response.slots().get(i)); + request.setProofs( + i < response.slots().size() - 1 ? new ArrayDeque<>() : response.proofs()); + request.checkProof(downloadState, worldStateProofProvider, fastSyncState); + } + } + return requestTasks; + }); + } + + public CompletableFuture>> requestCode( + final List> requestTasks) { + final List codeHashes = + requestTasks.stream() + .map(Task::getData) + .map(BytecodeRequest.class::cast) + .map(BytecodeRequest::getCodeHash) + .distinct() + .collect(Collectors.toList()); + final BlockHeader blockHeader = fastSyncState.getPivotBlockHeader().get(); + final EthTask> getByteCodeTask = + RetryingGetBytecodeFromPeerTask.forByteCode( + ethContext, codeHashes, blockHeader, metricsSystem); + downloadState.addOutstandingTask(getByteCodeTask); + return getByteCodeTask + .run() + .handle( + (response, error) -> { + if (response != null) { + downloadState.removeOutstandingTask(getByteCodeTask); + for (Task requestTask : requestTasks) { + final BytecodeRequest request = (BytecodeRequest) requestTask.getData(); + request.setRootHash(blockHeader.getStateRoot()); + if (response.containsKey(request.getCodeHash())) { + request.setCode(response.get(request.getCodeHash())); + } + } + } + return requestTasks; + }); + } + + public CompletableFuture>> requestTrieNodeByPath( + final List> requestTasks) { + + final BlockHeader blockHeader = fastSyncState.getPivotBlockHeader().get(); + final Map> message = new HashMap<>(); + requestTasks.stream() + .map(Task::getData) + .map(TrieNodeDataRequest.class::cast) + .map(TrieNodeDataRequest::getTrieNodePath) + .forEach( + path -> { + final List bytes = + message.computeIfAbsent(path.get(0), k -> Lists.newArrayList()); + if (path.size() > 1) { + bytes.add(path.get(1)); + } + }); + final EthTask> getTrieNodeFromPeerTask = + RetryingGetTrieNodeFromPeerTask.forTrieNodes( + ethContext, message, blockHeader, metricsSystem); + downloadState.addOutstandingTask(getTrieNodeFromPeerTask); + return getTrieNodeFromPeerTask + .run() + .handle( + (response, error) -> { + if (response != null) { + downloadState.removeOutstandingTask(getTrieNodeFromPeerTask); + for (final Task task : requestTasks) { + final TrieNodeDataRequest request = (TrieNodeDataRequest) task.getData(); + final Bytes matchingData = response.get(request.getPathId()); + if (matchingData != null) { + request.setData(matchingData); + } + } + } + return requestTasks; + }); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java new file mode 100644 index 000000000..224adeac6 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java @@ -0,0 +1,135 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import org.hyperledger.besu.ethereum.ProtocolContext; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.sync.SyncMode; +import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncStateStorage; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastDownloaderFactory; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.metrics.BesuMetricCategory; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues; + +import java.nio.file.Path; +import java.time.Clock; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnapDownloaderFactory extends FastDownloaderFactory { + + private static final Logger LOG = LoggerFactory.getLogger(SnapDownloaderFactory.class); + + public static Optional> createSnapDownloader( + final SynchronizerConfiguration syncConfig, + final Path dataDirectory, + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final MetricsSystem metricsSystem, + final EthContext ethContext, + final WorldStateStorage worldStateStorage, + final SyncState syncState, + final Clock clock) { + + final Path fastSyncDataDirectory = dataDirectory.resolve(FAST_SYNC_FOLDER); + final FastSyncStateStorage fastSyncStateStorage = + new FastSyncStateStorage(fastSyncDataDirectory); + + if (syncConfig.getSyncMode() != SyncMode.X_SNAP) { + if (fastSyncStateStorage.isFastSyncInProgress()) { + throw new IllegalStateException( + "Unable to change the sync mode when snap sync is incomplete, please restart with snap sync mode"); + } else { + return Optional.empty(); + } + } + + ensureDirectoryExists(fastSyncDataDirectory.toFile()); + + final FastSyncState fastSyncState = + fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)); + if (fastSyncState.getPivotBlockHeader().isEmpty() + && protocolContext.getBlockchain().getChainHeadBlockNumber() + != BlockHeader.GENESIS_BLOCK_NUMBER) { + LOG.info( + "Snap sync was requested, but cannot be enabled because the local blockchain is not empty."); + return Optional.empty(); + } + + final SnapSyncState snapSyncState = + new SnapSyncState( + fastSyncStateStorage.loadState( + ScheduleBasedBlockHeaderFunctions.create(protocolSchedule))); + worldStateStorage.clear(); + + final InMemoryTasksPriorityQueues snapTaskCollection = + createSnapWorldStateDownloaderTaskCollection(metricsSystem); + final WorldStateDownloader snapWorldStateDownloader = + new SnapWorldStateDownloader( + ethContext, + worldStateStorage, + snapTaskCollection, + syncConfig.getWorldStateHashCountPerRequest(), + syncConfig.getWorldStateRequestParallelism(), + syncConfig.getWorldStateMaxRequestsWithoutProgress(), + syncConfig.getWorldStateMinMillisBeforeStalling(), + clock, + metricsSystem); + final FastSyncDownloader fastSyncDownloader = + new SnapSyncDownloader( + new FastSyncActions( + syncConfig, + worldStateStorage, + protocolSchedule, + protocolContext, + ethContext, + syncState, + metricsSystem), + worldStateStorage, + snapWorldStateDownloader, + fastSyncStateStorage, + snapTaskCollection, + fastSyncDataDirectory, + snapSyncState); + syncState.setWorldStateDownloadStatus(snapWorldStateDownloader); + return Optional.of(fastSyncDownloader); + } + + private static InMemoryTasksPriorityQueues + createSnapWorldStateDownloaderTaskCollection(final MetricsSystem metricsSystem) { + final InMemoryTasksPriorityQueues taskCollection = + new InMemoryTasksPriorityQueues<>(); + + metricsSystem.createLongGauge( + BesuMetricCategory.SYNCHRONIZER, + "snap_world_state_pending_requests_current", + "Number of pending requests for snap sync world state download", + taskCollection::size); + return taskCollection; + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapSyncDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapSyncDownloader.java new file mode 100644 index 000000000..41b51972a --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapSyncDownloader.java @@ -0,0 +1,66 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncStateStorage; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.services.tasks.TaskCollection; + +import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnapSyncDownloader extends FastSyncDownloader { + + private static final Logger LOG = LoggerFactory.getLogger(SnapSyncDownloader.class); + + public SnapSyncDownloader( + final FastSyncActions fastSyncActions, + final WorldStateStorage worldStateStorage, + final WorldStateDownloader worldStateDownloader, + final FastSyncStateStorage fastSyncStateStorage, + final TaskCollection taskCollection, + final Path fastSyncDataDirectory, + final FastSyncState initialFastSyncState) { + super( + fastSyncActions, + worldStateStorage, + worldStateDownloader, + fastSyncStateStorage, + taskCollection, + fastSyncDataDirectory, + initialFastSyncState); + } + + @Override + protected CompletableFuture start(final FastSyncState fastSyncState) { + LOG.info("Starting snap sync."); + return findPivotBlock(fastSyncState, fss -> downloadChainAndWorldState(fastSyncActions, fss)); + } + + @Override + protected FastSyncState storeState(final FastSyncState fastSyncState) { + initialFastSyncState = fastSyncState; + fastSyncStateStorage.storeState(fastSyncState); + return new SnapSyncState(fastSyncState); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java new file mode 100644 index 000000000..833c4b746 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java @@ -0,0 +1,388 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom; + +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.TaskQueueIterator; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloadProcess; +import org.hyperledger.besu.metrics.BesuMetricCategory; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.metrics.Counter; +import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; +import org.hyperledger.besu.services.pipeline.Pipe; +import org.hyperledger.besu.services.pipeline.Pipeline; +import org.hyperledger.besu.services.pipeline.PipelineBuilder; +import org.hyperledger.besu.services.pipeline.WritePipe; +import org.hyperledger.besu.services.tasks.Task; +import org.hyperledger.besu.util.ExceptionUtils; + +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess { + + private static final Logger LOG = LoggerFactory.getLogger(SnapWorldStateDownloadProcess.class); + private final Pipeline> completionPipeline; + private final Pipeline> fetchAccountPipeline; + private final Pipeline> fetchStorageDataPipeline; + private final Pipeline> fetchBigStorageDataPipeline; + private final Pipeline> fetchCodePipeline; + private final Pipeline> fetchHealPipeline; + private final WritePipe> requestsToComplete; + + private SnapWorldStateDownloadProcess( + final Pipeline> fetchAccountPipeline, + final Pipeline> fetchStorageDataPipeline, + final Pipeline> fetchBigStorageDataPipeline, + final Pipeline> fetchCodePipeline, + final Pipeline> fetchHealPipeline, + final Pipeline> completionPipeline, + final WritePipe> requestsToComplete) { + this.fetchStorageDataPipeline = fetchStorageDataPipeline; + this.fetchAccountPipeline = fetchAccountPipeline; + this.fetchBigStorageDataPipeline = fetchBigStorageDataPipeline; + this.fetchCodePipeline = fetchCodePipeline; + this.fetchHealPipeline = fetchHealPipeline; + this.completionPipeline = completionPipeline; + this.requestsToComplete = requestsToComplete; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public CompletableFuture start(final EthScheduler ethScheduler) { + final CompletableFuture fetchAccountFuture = + ethScheduler.startPipeline(fetchAccountPipeline); + final CompletableFuture fetchStorageFuture = + ethScheduler.startPipeline(fetchStorageDataPipeline); + final CompletableFuture fetchBigStorageFuture = + ethScheduler.startPipeline(fetchBigStorageDataPipeline); + final CompletableFuture fetchCodeFuture = ethScheduler.startPipeline(fetchCodePipeline); + final CompletableFuture fetchHealFuture = ethScheduler.startPipeline(fetchHealPipeline); + final CompletableFuture completionFuture = ethScheduler.startPipeline(completionPipeline); + + fetchAccountFuture + .thenCombine(fetchStorageFuture, (unused, unused2) -> null) + .thenCombine(fetchBigStorageFuture, (unused, unused2) -> null) + .thenCombine(fetchCodeFuture, (unused, unused2) -> null) + .thenCombine(fetchHealFuture, (unused, unused2) -> null) + .whenComplete( + (result, error) -> { + if (error != null) { + if (!(ExceptionUtils.rootCause(error) instanceof CancellationException)) { + LOG.error("Pipeline failed", error); + } + completionPipeline.abort(); + } else { + // No more data to fetch, so propagate the pipe closure onto the completion pipe. + requestsToComplete.close(); + } + }); + + completionFuture.exceptionally( + error -> { + if (!(ExceptionUtils.rootCause(error) instanceof CancellationException)) { + LOG.error("Pipeline failed", error); + } + fetchAccountPipeline.abort(); + fetchStorageDataPipeline.abort(); + fetchBigStorageDataPipeline.abort(); + fetchCodePipeline.abort(); + fetchHealPipeline.abort(); + return null; + }); + return completionFuture; + } + + @Override + public void abort() { + fetchAccountPipeline.abort(); + fetchStorageDataPipeline.abort(); + fetchBigStorageDataPipeline.abort(); + fetchCodePipeline.abort(); + fetchHealPipeline.abort(); + completionPipeline.abort(); + } + + public static class Builder { + + private int taskCountPerRequest; + private int maxOutstandingRequests; + private SnapWorldDownloadState downloadState; + private MetricsSystem metricsSystem; + private LoadLocalDataStep loadLocalDataStep; + private RequestDataStep requestDataStep; + private SnapSyncState snapSyncState; + private PersistDataStep persistDataStep; + private CompleteTaskStep completeTaskStep; + private DynamicPivotBlockManager pivotBlockManager; + + public Builder taskCountPerRequest(final int taskCountPerRequest) { + this.taskCountPerRequest = taskCountPerRequest; + return this; + } + + public Builder pivotBlockManager( + final DynamicPivotBlockManager pivotBlockManager) { + this.pivotBlockManager = pivotBlockManager; + return this; + } + + public Builder maxOutstandingRequests(final int maxOutstandingRequests) { + this.maxOutstandingRequests = maxOutstandingRequests; + return this; + } + + public Builder loadLocalDataStep(final LoadLocalDataStep loadLocalDataStep) { + this.loadLocalDataStep = loadLocalDataStep; + return this; + } + + public Builder requestDataStep(final RequestDataStep requestDataStep) { + this.requestDataStep = requestDataStep; + return this; + } + + public Builder persistDataStep(final PersistDataStep persistDataStep) { + this.persistDataStep = persistDataStep; + return this; + } + + public Builder completeTaskStep(final CompleteTaskStep completeTaskStep) { + this.completeTaskStep = completeTaskStep; + return this; + } + + public Builder downloadState(final SnapWorldDownloadState downloadState) { + this.downloadState = downloadState; + return this; + } + + public Builder fastSyncState(final SnapSyncState fastSyncState) { + this.snapSyncState = fastSyncState; + return this; + } + + public Builder metricsSystem(final MetricsSystem metricsSystem) { + this.metricsSystem = metricsSystem; + return this; + } + + public SnapWorldStateDownloadProcess build() { + checkNotNull(loadLocalDataStep); + checkNotNull(requestDataStep); + checkNotNull(persistDataStep); + checkNotNull(completeTaskStep); + checkNotNull(downloadState); + checkNotNull(snapSyncState); + checkNotNull(metricsSystem); + + // Room for the requests we expect to do in parallel plus some buffer but not unlimited. + final int bufferCapacity = taskCountPerRequest * 2; + final LabelledMetric outputCounter = + metricsSystem.createLabelledCounter( + BesuMetricCategory.SYNCHRONIZER, + "snap_world_state_pipeline_processed_total", + "Number of entries processed by each world state download pipeline stage", + "step", + "action"); + + final Pipeline> completionPipeline = + PipelineBuilder.>createPipeline( + "requestDataAvailable", bufferCapacity, outputCounter, true, "node_data_request") + .andFinishWith( + "requestCompleteTask", + task -> completeTaskStep.markAsCompleteOrFailed(downloadState, task)); + + final Pipe> requestsToComplete = completionPipeline.getInputPipe(); + + final Pipeline> fetchAccountDataPipeline = + createPipelineFrom( + "dequeueAccountRequestBlocking", + new TaskQueueIterator<>( + downloadState, () -> downloadState.dequeueAccountRequestBlocking()), + bufferCapacity, + outputCounter, + true, + "world_state_download") + .thenProcess( + "checkNewPivotBlock", + tasks -> { + pivotBlockManager.check(blockHeader -> {}); + return tasks; + }) + .thenProcessAsync( + "batchDownloadData", + requestTask -> requestDataStep.requestAccount(requestTask), + maxOutstandingRequests) + .thenProcess("batchPersistData", task -> persistDataStep.persist(task)) + .andFinishWith("batchDataDownloaded", requestsToComplete::put); + + final Pipeline> fetchStorageDataPipeline = + createPipelineFrom( + "dequeueStorageRequestBlocking", + new TaskQueueIterator<>( + downloadState, () -> downloadState.dequeueStorageRequestBlocking()), + bufferCapacity, + outputCounter, + true, + "world_state_download") + .inBatches(84) + .thenProcess( + "checkNewPivotBlock", + tasks -> { + pivotBlockManager.check(blockHeader -> {}); + return tasks; + }) + .thenProcessAsyncOrdered( + "batchDownloadData", + requestTask -> requestDataStep.requestStorage(requestTask), + maxOutstandingRequests) + .thenProcess("batchPersistData", task -> persistDataStep.persist(task)) + .andFinishWith( + "batchDataDownloaded", + tasks -> { + tasks.forEach(requestsToComplete::put); + }); + + final Pipeline> fetchBigStorageDataPipeline = + createPipelineFrom( + "dequeueBigStorageRequestBlocking", + new TaskQueueIterator<>( + downloadState, () -> downloadState.dequeueBigStorageRequestBlocking()), + bufferCapacity, + outputCounter, + true, + "world_state_download") + .thenProcess( + "checkNewPivotBlock", + tasks -> { + pivotBlockManager.check(blockHeader -> {}); + return tasks; + }) + .thenProcessAsyncOrdered( + "batchDownloadData", + requestTask -> requestDataStep.requestStorage(List.of(requestTask)), + maxOutstandingRequests) + .thenProcess( + "batchPersistData", + task -> { + persistDataStep.persist(task); + return task; + }) + .andFinishWith( + "batchDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put)); + + final Pipeline> fetchCodePipeline = + createPipelineFrom( + "dequeueCodeRequestBlocking", + new TaskQueueIterator<>( + downloadState, () -> downloadState.dequeueCodeRequestBlocking()), + bufferCapacity, + outputCounter, + true, + "code_blocks_download_pipeline") + .inBatches( + taskCountPerRequest, + tasks -> + 84 + - (int) + tasks.stream() + .map(Task::getData) + .map(BytecodeRequest.class::cast) + .map(BytecodeRequest::getCodeHash) + .distinct() + .count()) + .thenProcess( + "checkNewPivotBlock", + tasks -> { + pivotBlockManager.check( + blockHeader -> { + if (snapSyncState.isHealInProgress()) downloadState.clearTrieNodes(); + }); + return tasks; + }) + .thenProcessAsyncOrdered( + "batchDownloadCodeBlocksData", + tasks -> requestDataStep.requestCode(tasks), + maxOutstandingRequests) + .thenProcess( + "batchPersistData", + tasks -> { + persistDataStep.persist(tasks); + return tasks; + }) + .andFinishWith( + "batchDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put)); + + final Pipeline> fetchHealDataPipeline = + createPipelineFrom( + "requestDequeued", + new TaskQueueIterator<>( + downloadState, () -> downloadState.dequeueTrieNodeRequestBlocking()), + bufferCapacity, + outputCounter, + true, + "world_state_download") + .thenFlatMapInParallel( + "requestLoadLocalData", + task -> loadLocalDataStep.loadLocalDataTrieNode(task, requestsToComplete), + 3, + bufferCapacity) + .inBatches(taskCountPerRequest) + .thenProcess( + "checkNewPivotBlock", + tasks -> { + pivotBlockManager.check( + blockHeader -> { + if (snapSyncState.isHealInProgress()) downloadState.clearTrieNodes(); + }); + return tasks; + }) + .thenProcessAsync( + "batchDownloadData", + tasks -> requestDataStep.requestTrieNodeByPath(tasks), + maxOutstandingRequests) + .thenProcess( + "batchPersistData", + tasks -> { + persistDataStep.persist(tasks); + return tasks; + }) + .andFinishWith( + "batchDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put)); + + return new SnapWorldStateDownloadProcess( + fetchAccountDataPipeline, + fetchStorageDataPipeline, + fetchBigStorageDataPipeline, + fetchCodePipeline, + fetchHealDataPipeline, + completionPipeline, + requestsToComplete); + } + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java new file mode 100644 index 000000000..1b27e71d6 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java @@ -0,0 +1,188 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import static org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest.createAccountRangeDataRequest; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.metrics.BesuMetricCategory; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues; + +import java.time.Clock; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.IntSupplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnapWorldStateDownloader implements WorldStateDownloader { + + private static final Logger LOG = LoggerFactory.getLogger(SnapWorldStateDownloader.class); + private final long minMillisBeforeStalling; + private final Clock clock; + private final MetricsSystem metricsSystem; + + private final EthContext ethContext; + private final InMemoryTasksPriorityQueues snapTaskCollection; + private final int hashCountPerRequest; + private final int maxOutstandingRequests; + private final int maxNodeRequestsWithoutProgress; + private final WorldStateStorage worldStateStorage; + private SnapWorldStateDownloadProcess downloadProcess; + + private final AtomicReference downloadState = new AtomicReference<>(); + + private Optional maybeCompleteTask = Optional.empty(); + + public SnapWorldStateDownloader( + final EthContext ethContext, + final WorldStateStorage worldStateStorage, + final InMemoryTasksPriorityQueues snapTaskCollection, + final int hashCountPerRequest, + final int maxOutstandingRequests, + final int maxNodeRequestsWithoutProgress, + final long minMillisBeforeStalling, + final Clock clock, + final MetricsSystem metricsSystem) { + this.ethContext = ethContext; + this.worldStateStorage = worldStateStorage; + this.snapTaskCollection = snapTaskCollection; + this.hashCountPerRequest = hashCountPerRequest; + this.maxOutstandingRequests = maxOutstandingRequests; + this.maxNodeRequestsWithoutProgress = maxNodeRequestsWithoutProgress; + this.minMillisBeforeStalling = minMillisBeforeStalling; + this.clock = clock; + this.metricsSystem = metricsSystem; + + metricsSystem.createIntegerGauge( + BesuMetricCategory.SYNCHRONIZER, + "snap_world_state_node_requests_since_last_progress_current", + "Number of world state requests made since the last time new data was returned", + downloadStateValue(SnapWorldDownloadState::getRequestsSinceLastProgress)); + + metricsSystem.createIntegerGauge( + BesuMetricCategory.SYNCHRONIZER, + "snap_world_state_inflight_requests_current", + "Number of in progress requests for world state data", + downloadStateValue(SnapWorldDownloadState::getOutstandingTaskCount)); + } + + private IntSupplier downloadStateValue(final Function getter) { + return () -> { + final SnapWorldDownloadState state = this.downloadState.get(); + return state != null ? getter.apply(state) : 0; + }; + } + + @Override + public CompletableFuture run( + final FastSyncActions fastSyncActions, final FastSyncState fastSyncState) { + synchronized (this) { + final SnapWorldDownloadState oldDownloadState = this.downloadState.get(); + if (oldDownloadState != null && oldDownloadState.isDownloading()) { + final CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally( + new IllegalStateException( + "Cannot run an already running " + this.getClass().getSimpleName())); + return failed; + } + + final SnapSyncState snapSyncState = (SnapSyncState) fastSyncState; + final BlockHeader header = fastSyncState.getPivotBlockHeader().get(); + final Hash stateRoot = header.getStateRoot(); + LOG.info( + "Downloading world state from peers for block {} ({}). State root {} pending request " + + snapTaskCollection.size(), + header.getNumber(), + header.getHash(), + stateRoot); + + final SnapWorldDownloadState newDownloadState = + new SnapWorldDownloadState( + worldStateStorage, + snapSyncState, + snapTaskCollection, + maxNodeRequestsWithoutProgress, + minMillisBeforeStalling, + metricsSystem, + clock); + + RangeManager.generateAllRanges(16) + .forEach( + (key, value) -> + newDownloadState.enqueueRequest( + createAccountRangeDataRequest(stateRoot, key, value))); + + maybeCompleteTask = Optional.of(new CompleteTaskStep(snapSyncState, metricsSystem)); + + downloadProcess = + SnapWorldStateDownloadProcess.builder() + .taskCountPerRequest(hashCountPerRequest) + .maxOutstandingRequests(maxOutstandingRequests) + .pivotBlockManager( + new DynamicPivotBlockManager<>(newDownloadState, fastSyncActions, snapSyncState)) + .loadLocalDataStep( + new LoadLocalDataStep( + worldStateStorage, newDownloadState, metricsSystem, snapSyncState)) + .requestDataStep( + new RequestDataStep( + ethContext, + worldStateStorage, + snapSyncState, + newDownloadState, + metricsSystem)) + .persistDataStep( + new PersistDataStep(snapSyncState, worldStateStorage, newDownloadState)) + .completeTaskStep(maybeCompleteTask.get()) + .downloadState(newDownloadState) + .fastSyncState(snapSyncState) + .metricsSystem(metricsSystem) + .build(); + + return newDownloadState.startDownload(downloadProcess, ethContext.getScheduler()); + } + } + + @Override + public void cancel() { + synchronized (this) { + final SnapWorldDownloadState downloadState = this.downloadState.get(); + if (downloadState != null) { + downloadState.getDownloadFuture().cancel(true); + } + } + } + + @Override + public Optional getPulledStates() { + return Optional.empty(); + } + + @Override + public Optional getKnownStates() { + return Optional.empty(); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java index 8d1c0364d..3c4dc7c78 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java @@ -204,13 +204,15 @@ public class AccountRangeDataRequest extends SnapDataRequest { if (!accountValue.getStorageRoot().equals(Hash.EMPTY_TRIE_HASH)) { childRequests.add( createStorageRangeDataRequest( + getRootHash(), account.getKey(), accountValue.getStorageRoot(), startStorageRange.orElse(MIN_RANGE), endStorageRange.orElse(MAX_RANGE))); } if (!accountValue.getCodeHash().equals(Hash.EMPTY)) { - childRequests.add(createBytecodeRequest(account.getKey(), accountValue.getCodeHash())); + childRequests.add( + createBytecodeRequest(account.getKey(), getRootHash(), accountValue.getCodeHash())); } } return childRequests.stream(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountTrieNodeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountTrieNodeDataRequest.java index 9d1aba9c5..3f1223bfe 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountTrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountTrieNodeDataRequest.java @@ -140,7 +140,7 @@ public class AccountTrieNodeDataRequest extends TrieNodeDataRequest { // Add code, if appropriate if (!accountValue.getCodeHash().equals(Hash.EMPTY)) { - builder.add(createBytecodeRequest(accountHash, accountValue.getCodeHash())); + builder.add(createBytecodeRequest(accountHash, getRootHash(), accountValue.getCodeHash())); } // Add storage, if appropriate if (!accountValue.getStorageRoot().equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/SnapDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/SnapDataRequest.java index f59448750..44ae1cc98 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/SnapDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/SnapDataRequest.java @@ -64,26 +64,27 @@ public abstract class SnapDataRequest implements TasksPriorityProvider { return new AccountRangeDataRequest(rootHash, accountHash, startStorageRange, endStorageRange); } - public StorageRangeDataRequest createStorageRangeDataRequest( + public static StorageRangeDataRequest createStorageRangeDataRequest( + final Hash rootHash, final Bytes32 accountHash, final Bytes32 storageRoot, final Bytes32 startKeyHash, final Bytes32 endKeyHash) { return new StorageRangeDataRequest( - getRootHash(), accountHash, storageRoot, startKeyHash, endKeyHash); + rootHash, accountHash, storageRoot, startKeyHash, endKeyHash); } public static AccountTrieNodeDataRequest createAccountTrieNodeDataRequest( - final Hash hash, final Bytes location, final HashSet accountHeals) { - return new AccountTrieNodeDataRequest(hash, hash, location, accountHeals); + final Hash hash, final Bytes location, final HashSet inconsistentAccounts) { + return new AccountTrieNodeDataRequest(hash, hash, location, inconsistentAccounts); } public static AccountTrieNodeDataRequest createAccountTrieNodeDataRequest( final Hash hash, final Hash rootHash, final Bytes location, - final HashSet accountHeals) { - return new AccountTrieNodeDataRequest(hash, rootHash, location, accountHeals); + final HashSet inconsistentAccounts) { + return new AccountTrieNodeDataRequest(hash, rootHash, location, inconsistentAccounts); } public static StorageTrieNodeDataRequest createStorageTrieNodeDataRequest( @@ -91,8 +92,9 @@ public abstract class SnapDataRequest implements TasksPriorityProvider { return new StorageTrieNodeDataRequest(hash, accountHash, rootHash, location); } - public BytecodeRequest createBytecodeRequest(final Bytes32 accountHash, final Bytes32 codeHash) { - return new BytecodeRequest(getRootHash(), accountHash, codeHash); + public static BytecodeRequest createBytecodeRequest( + final Bytes32 accountHash, final Hash rootHash, final Bytes32 codeHash) { + return new BytecodeRequest(rootHash, accountHash, codeHash); } public int persist( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java index 00829bb64..b1199fb18 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; +import com.google.common.annotations.VisibleForTesting; import kotlin.collections.ArrayDeque; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; @@ -159,7 +160,8 @@ public class StorageRangeDataRequest extends SnapDataRequest { .forEach( (key, value) -> { final StorageRangeDataRequest storageRangeDataRequest = - createStorageRangeDataRequest(accountHash, storageRoot, key, value); + createStorageRangeDataRequest( + getRootHash(), accountHash, storageRoot, key, value); storageRangeDataRequest.addStackTrie(Optional.of(stackTrie)); childRequests.add(storageRangeDataRequest); }); @@ -176,19 +178,11 @@ public class StorageRangeDataRequest extends SnapDataRequest { private int findNbRanges() { if (startKeyHash.equals(MIN_RANGE) && endKeyHash.equals(MAX_RANGE)) { - final int nbRangesNeeded = - MAX_RANGE - .toUnsignedBigInteger() - .divide( - slots - .lastKey() - .toUnsignedBigInteger() - .subtract(startKeyHash.toUnsignedBigInteger())) - .intValue(); - if (nbRangesNeeded >= MAX_CHILD) { - return MAX_CHILD; - } - return nbRangesNeeded; + return MAX_RANGE + .toUnsignedBigInteger() + .divide( + slots.lastKey().toUnsignedBigInteger().subtract(startKeyHash.toUnsignedBigInteger())) + .intValue(); } return 1; } @@ -201,6 +195,10 @@ public class StorageRangeDataRequest extends SnapDataRequest { return storageRoot; } + public TreeMap getSlots() { + return slots; + } + public Bytes32 getStartKeyHash() { return startKeyHash; } @@ -217,10 +215,15 @@ public class StorageRangeDataRequest extends SnapDataRequest { this.slots = slots; } + @VisibleForTesting + public void setProofValid(final boolean proofValid) { + isProofValid = proofValid; + } + public void addStackTrie(final Optional maybeStackTrie) { stackTrie = maybeStackTrie .filter(StackTrie::addSegment) - .orElse(new StackTrie(getRootHash(), 1, 3, startKeyHash)); + .orElse(new StackTrie(Hash.wrap(getStorageRoot()), 1, 3, startKeyHash)); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/TaskQueueIterator.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/TaskQueueIterator.java index 9e0730ae5..bb46901c0 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/TaskQueueIterator.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/TaskQueueIterator.java @@ -18,13 +18,22 @@ import org.hyperledger.besu.services.tasks.Task; import org.hyperledger.besu.services.tasks.TasksPriorityProvider; import java.util.Iterator; +import java.util.function.Supplier; public class TaskQueueIterator implements Iterator> { - private final WorldDownloadState downloadState; + private final WorldDownloadState downloadState; + private final Supplier> supplier; public TaskQueueIterator(final WorldDownloadState downloadState) { + this(downloadState, downloadState::dequeueRequestBlocking); + } + + public TaskQueueIterator( + final WorldDownloadState downloadState, + final Supplier> supplier) { + this.supplier = supplier; this.downloadState = downloadState; } @@ -35,6 +44,6 @@ public class TaskQueueIterator @Override public Task next() { - return downloadState.dequeueRequestBlocking(); + return supplier.get(); } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/snap/StorageRangeMessageTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/snap/StorageRangeMessageTest.java index 19554ec42..c3272d113 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/snap/StorageRangeMessageTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/snap/StorageRangeMessageTest.java @@ -19,9 +19,8 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.TreeMap; import kotlin.collections.ArrayDeque; import org.apache.tuweni.bytes.Bytes; @@ -34,8 +33,8 @@ public final class StorageRangeMessageTest { @Test public void roundTripTest() { - final ArrayDeque> keys = new ArrayDeque<>(); - final Map storage = new HashMap<>(); + final ArrayDeque> keys = new ArrayDeque<>(); + final TreeMap storage = new TreeMap<>(); storage.put(Hash.wrap(Bytes32.leftPad(Bytes.of(1))), Bytes32.random()); keys.add(storage); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java index 584c8e1d8..76092c361 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java @@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastDownloaderFactory; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java index 1191f7c0b..99ddee822 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java @@ -38,6 +38,7 @@ import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import java.util.ArrayList; @@ -59,6 +60,7 @@ public class FastSyncActionsTest { private final SynchronizerConfiguration.Builder syncConfigBuilder = new SynchronizerConfiguration.Builder().syncMode(SyncMode.FAST).fastSyncPivotDistance(1000); + private final WorldStateStorage worldStateStorage = mock(WorldStateStorage.class); private final FastSyncStateStorage fastSyncStateStorage = mock(FastSyncStateStorage.class); private final AtomicInteger timeoutCount = new AtomicInteger(0); private SynchronizerConfiguration syncConfig = syncConfigBuilder.build(); @@ -91,6 +93,7 @@ public class FastSyncActionsTest { blockchainSetupUtil.getTransactionPool(), EthProtocolConfiguration.defaultConfig()); fastSyncActions = createFastSyncActions(syncConfig); + when(worldStateStorage.isWorldStateAvailable(any(), any())).thenReturn(true); } @Test @@ -412,6 +415,7 @@ public class FastSyncActionsTest { final EthContext ethContext = ethProtocolManager.ethContext(); return new FastSyncActions( syncConfig, + worldStateStorage, protocolSchedule, protocolContext, ethContext, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java index bb3887742..c6dd65a72 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java @@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync; import static org.assertj.core.api.Assertions.assertThat; import static org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode.LIGHT_SKIP_DETACHED; import static org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason.TOO_MANY_PEERS; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -36,6 +37,7 @@ import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import java.util.Arrays; @@ -54,6 +56,7 @@ import org.junit.runners.Parameterized.Parameters; public class FastSyncChainDownloaderTest { private final FastSyncValidationPolicy validationPolicy = mock(FastSyncValidationPolicy.class); + private final WorldStateStorage worldStateStorage = mock(WorldStateStorage.class); protected ProtocolSchedule protocolSchedule; protected EthProtocolManager ethProtocolManager; @@ -79,6 +82,7 @@ public class FastSyncChainDownloaderTest { @Before public void setup() { when(validationPolicy.getValidationModeForNextBlock()).thenReturn(LIGHT_SKIP_DETACHED); + when(worldStateStorage.isWorldStateAvailable(any(), any())).thenReturn(true); final BlockchainSetupUtil localBlockchainSetup = BlockchainSetupUtil.forTesting(storageFormat); localBlockchain = localBlockchainSetup.getBlockchain(); otherBlockchainSetup = BlockchainSetupUtil.forTesting(storageFormat); @@ -103,6 +107,7 @@ public class FastSyncChainDownloaderTest { final SynchronizerConfiguration syncConfig, final long pivotBlockNumber) { return FastSyncChainDownloader.create( syncConfig, + worldStateStorage, protocolSchedule, protocolContext, ethContext, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java index 2128de059..b672b7737 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.assertj.core.api.Assertions; +import org.junit.Before; import org.junit.Test; public class FastSyncDownloaderTest { @@ -67,8 +68,8 @@ public class FastSyncDownloaderTest { private final Path fastSyncDataDirectory = null; - private final FastSyncDownloader downloader = - new FastSyncDownloader( + private final FastSyncDownloader downloader = + new FastSyncDownloader<>( fastSyncActions, worldStateStorage, worldStateDownloader, @@ -77,6 +78,11 @@ public class FastSyncDownloaderTest { fastSyncDataDirectory, FastSyncState.EMPTY_SYNC_STATE); + @Before + public void setup() { + when(worldStateStorage.isWorldStateAvailable(any(), any())).thenReturn(true); + } + @Test public void shouldCompleteFastSyncSuccessfully() { final FastSyncState selectPivotBlockState = new FastSyncState(50); @@ -122,8 +128,8 @@ public class FastSyncDownloaderTest { any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader)))) .thenReturn(completedFuture(null)); - final FastSyncDownloader resumedDownloader = - new FastSyncDownloader( + final FastSyncDownloader resumedDownloader = + new FastSyncDownloader<>( fastSyncActions, worldStateStorage, worldStateDownloader, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStepTest.java index 5b54fa830..e24a34202 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStepTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.verify; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; -import org.hyperledger.besu.ethereum.eth.sync.worldstate.StubTask; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import java.util.Optional; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/LoadLocalDataStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/LoadLocalDataStepTest.java index 3ef590451..cb7557058 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/LoadLocalDataStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/LoadLocalDataStepTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.ethereum.eth.sync.worldstate.StubTask; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.services.pipeline.Pipe; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStepTest.java index d7f329495..8b350334e 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStepTest.java @@ -24,7 +24,6 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; -import org.hyperledger.besu.ethereum.eth.sync.worldstate.StubTask; import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie; import org.hyperledger.besu.ethereum.trie.SimpleMerklePatriciaTrie; import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStepTest.java index 84b1c3400..1173c6e5f 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStepTest.java @@ -28,7 +28,6 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; -import org.hyperledger.besu.ethereum.eth.sync.worldstate.StubTask; import org.hyperledger.besu.services.tasks.Task; import java.util.List; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/StubTask.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/StubTask.java similarity index 91% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/StubTask.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/StubTask.java index 3965969bc..429238789 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/StubTask.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/StubTask.java @@ -12,10 +12,9 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.NodeDataRequest; import org.hyperledger.besu.services.tasks.Task; import java.util.Optional; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStepTest.java new file mode 100644 index 000000000..06b0ee481 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStepTest.java @@ -0,0 +1,149 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; +import org.hyperledger.besu.services.tasks.Task; + +import java.util.HashSet; +import java.util.List; +import java.util.Optional; + +import org.apache.tuweni.bytes.Bytes; +import org.junit.Before; +import org.junit.Test; + +public class CompleteTaskStepTest { + + private static final Hash HASH = Hash.hash(Bytes.of(1, 2, 3)); + + private final SnapSyncState snapSyncState = mock(SnapSyncState.class); + private final SnapWorldDownloadState downloadState = mock(SnapWorldDownloadState.class); + + private final BlockHeader blockHeader = + new BlockHeaderTestFixture().stateRoot(HASH).buildHeader(); + + private final CompleteTaskStep completeTaskStep = + new CompleteTaskStep(snapSyncState, new NoOpMetricsSystem()); + + @Before + public void setup() { + when(snapSyncState.getPivotBlockHeader()).thenReturn(Optional.of(blockHeader)); + } + + @Test + public void shouldMarkAccountTrieNodeTaskAsFailedIfItDoesNotHaveData() { + final StubTask task = + new StubTask( + SnapDataRequest.createAccountTrieNodeDataRequest(HASH, Bytes.EMPTY, new HashSet<>())); + + completeTaskStep.markAsCompleteOrFailed(downloadState, task); + + assertThat(task.isCompleted()).isFalse(); + assertThat(task.isFailed()).isTrue(); + verify(downloadState).notifyTaskAvailable(); + verify(downloadState, never()).checkCompletion(blockHeader); + } + + @Test + public void shouldMarkAccountTrieNodeTaskCompleteIfItDoesNotHaveDataAndExpired() { + final StubTask task = + new StubTask( + SnapDataRequest.createAccountTrieNodeDataRequest(HASH, Bytes.EMPTY, new HashSet<>())); + + when(snapSyncState.isExpired(any())).thenReturn(true); + + completeTaskStep.markAsCompleteOrFailed(downloadState, task); + + assertThat(task.isCompleted()).isTrue(); + verify(downloadState).notifyTaskAvailable(); + verify(downloadState).checkCompletion(blockHeader); + } + + @Test + public void shouldMarkStorageTrieNodeTaskAsFailedIfItDoesNotHaveData() { + final StubTask task = + new StubTask( + SnapDataRequest.createStorageTrieNodeDataRequest(HASH, HASH, HASH, Bytes.EMPTY)); + + completeTaskStep.markAsCompleteOrFailed(downloadState, task); + + assertThat(task.isCompleted()).isFalse(); + assertThat(task.isFailed()).isTrue(); + + verify(downloadState, never()).checkCompletion(blockHeader); + } + + @Test + public void shouldMarkStorageTrieNodeTaskCompleteIfItDoesNotHaveDataAndExpired() { + final StubTask task = + new StubTask( + SnapDataRequest.createStorageTrieNodeDataRequest(HASH, HASH, HASH, Bytes.EMPTY)); + + when(snapSyncState.isExpired(any())).thenReturn(true); + + completeTaskStep.markAsCompleteOrFailed(downloadState, task); + + assertThat(task.isCompleted()).isTrue(); + assertThat(task.isFailed()).isFalse(); + + verify(downloadState).checkCompletion(blockHeader); + } + + @Test + public void shouldMarkSnapsyncTaskCompleteWhenData() { + final List> requests = TaskGenerator.createAccountRequest(true); + requests.stream() + .map(StubTask.class::cast) + .forEach( + snapDataRequestTask -> { + completeTaskStep.markAsCompleteOrFailed(downloadState, snapDataRequestTask); + + assertThat(snapDataRequestTask.isCompleted()).isTrue(); + assertThat(snapDataRequestTask.isFailed()).isFalse(); + }); + verify(downloadState, times(3)).checkCompletion(blockHeader); + verify(downloadState, times(3)).notifyTaskAvailable(); + } + + @Test + public void shouldMarkSnapsyncTaskAsFailedWhenNoData() { + final List> requests = TaskGenerator.createAccountRequest(false); + requests.stream() + .map(StubTask.class::cast) + .forEach( + snapDataRequestTask -> { + completeTaskStep.markAsCompleteOrFailed(downloadState, snapDataRequestTask); + + assertThat(snapDataRequestTask.isCompleted()).isFalse(); + assertThat(snapDataRequestTask.isFailed()).isTrue(); + }); + + verify(downloadState, times(3)).notifyTaskAvailable(); + verify(downloadState, never()).checkCompletion(blockHeader); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockManagerTest.java new file mode 100644 index 000000000..4fe0443da --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockManagerTest.java @@ -0,0 +1,123 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; + +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; + +import org.junit.Before; +import org.junit.Test; + +public class DynamicPivotBlockManagerTest { + + private final SnapSyncState snapSyncState = mock(SnapSyncState.class); + private final SnapWorldDownloadState downloadState = mock(SnapWorldDownloadState.class); + private final FastSyncActions fastSyncActions = mock(FastSyncActions.class); + private final SyncState syncState = mock(SyncState.class); + + private final DynamicPivotBlockManager dynamicPivotBlockManager = + new DynamicPivotBlockManager<>(downloadState, fastSyncActions, snapSyncState); + + @Before + public void setup() { + when(fastSyncActions.getSyncState()).thenReturn(syncState); + } + + @Test + public void shouldNotSearchNewPivotBlockWhenCloseToTheHead() { + + when(syncState.bestChainHeight()).thenReturn(1000L); + + when(snapSyncState.getPivotBlockNumber()).thenReturn(OptionalLong.of(999)); + dynamicPivotBlockManager.check( + blockHeader -> { + fail("new pivot block not expected"); + }); + verify(fastSyncActions, never()).waitForSuitablePeers(any()); + } + + @Test + public void shouldSearchNewPivotBlockWhenNotCloseToTheHead() { + + final CompletableFuture COMPLETE = + completedFuture(FastSyncState.EMPTY_SYNC_STATE); + final FastSyncState selectPivotBlockState = new FastSyncState(1090); + final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(1090).buildHeader(); + final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader); + when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE); + when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE)) + .thenReturn(completedFuture(selectPivotBlockState)); + when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState)) + .thenReturn(completedFuture(downloadPivotBlockHeaderState)); + + when(syncState.bestChainHeight()).thenReturn(1000L); + + when(snapSyncState.getPivotBlockNumber()).thenReturn(OptionalLong.of(939)); + dynamicPivotBlockManager.check( + blockHeader -> { + fail("new pivot block not expected"); + }); + verify(fastSyncActions).waitForSuitablePeers(any()); + } + + @Test + public void shouldSwitchToNewPivotBlockWhenNeeded() { + + final CompletableFuture COMPLETE = + completedFuture(FastSyncState.EMPTY_SYNC_STATE); + final FastSyncState selectPivotBlockState = new FastSyncState(1060); + final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(1060).buildHeader(); + final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader); + when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE); + when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE)) + .thenReturn(completedFuture(selectPivotBlockState)); + when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState)) + .thenReturn(completedFuture(downloadPivotBlockHeaderState)); + + when(syncState.bestChainHeight()).thenReturn(1000L); + + when(snapSyncState.getPivotBlockNumber()).thenReturn(OptionalLong.of(939)); + dynamicPivotBlockManager.check( + blockHeader -> { + fail("new pivot block not expected"); + }); + + when(syncState.bestChainHeight()).thenReturn(1066L); + + dynamicPivotBlockManager.check( + blockHeader -> { + assertThat(blockHeader.getNumber()).isEqualTo(pivotBlockHeader.getNumber()); + }); + + verify(snapSyncState).setCurrentHeader(pivotBlockHeader); + verify(fastSyncActions).waitForSuitablePeers(any()); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStepTest.java new file mode 100644 index 000000000..4363cf8e0 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStepTest.java @@ -0,0 +1,102 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.hyperledger.besu.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountTrieNodeDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; +import org.hyperledger.besu.services.pipeline.Pipe; +import org.hyperledger.besu.services.tasks.Task; + +import java.util.HashSet; +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.tuweni.bytes.Bytes; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class LoadLocalDataStepTest { + + private static final Bytes DATA = Bytes.of(1, 2, 3); + private static final Hash HASH = Hash.hash(DATA); + + private final BlockHeader blockHeader = + new BlockHeaderTestFixture().stateRoot(HASH).buildHeader(); + private final AccountTrieNodeDataRequest request = + SnapDataRequest.createAccountTrieNodeDataRequest( + HASH, Bytes.fromHexString("0x01"), new HashSet<>()); + private final Task task = new StubTask(request); + + private final Pipe> completedTasks = + new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER); + + private final SnapSyncState snapSyncState = mock(SnapSyncState.class); + private final SnapWorldDownloadState downloadState = mock(SnapWorldDownloadState.class); + private final WorldStateStorage worldStateStorage = mock(WorldStateStorage.class); + private final WorldStateStorage.Updater updater = mock(WorldStateStorage.Updater.class); + + private final LoadLocalDataStep loadLocalDataStep = + new LoadLocalDataStep( + worldStateStorage, downloadState, new NoOpMetricsSystem(), snapSyncState); + + @Before + public void setup() { + when(snapSyncState.hasPivotBlockHeader()).thenReturn(true); + when(snapSyncState.getPivotBlockHeader()).thenReturn(Optional.of(blockHeader)); + } + + @Test + public void shouldReturnStreamWithUnchangedTaskWhenDataNotPresent() { + final Stream> output = + loadLocalDataStep.loadLocalDataTrieNode(task, completedTasks); + + assertThat(completedTasks.poll()).isNull(); + assertThat(output).containsExactly(task); + } + + @Test + public void shouldReturnEmptyStreamAndSendTaskToCompletedPipeWhenDataIsPresent() { + when(worldStateStorage.getAccountStateTrieNode(any(), any())).thenReturn(Optional.of(DATA)); + when(worldStateStorage.updater()).thenReturn(updater); + + final Stream> output = + loadLocalDataStep.loadLocalDataTrieNode(task, completedTasks); + + assertThat(completedTasks.poll()).isSameAs(task); + assertThat(request.isValid()).isTrue(); + assertThat(output).isEmpty(); + + verify(updater).commit(); + Mockito.reset(updater); + + // Should not require persisting. + request.persist(worldStateStorage, updater, downloadState, snapSyncState); + verifyNoInteractions(updater); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStepTest.java new file mode 100644 index 000000000..dc6271e07 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStepTest.java @@ -0,0 +1,108 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.StorageRangeDataRequest; +import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie; +import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.metrics.RunnableCounter; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; +import org.hyperledger.besu.services.tasks.Task; + +import java.util.List; + +import org.apache.tuweni.bytes.Bytes; +import org.junit.Before; +import org.junit.Test; + +public class PersistDataStepTest { + + private final WorldStateStorage worldStateStorage = + new InMemoryKeyValueStorageProvider().createWorldStateStorage(DataStorageFormat.FOREST); + private final SnapSyncState snapSyncState = mock(SnapSyncState.class); + private final SnapWorldDownloadState downloadState = mock(SnapWorldDownloadState.class); + + private final PersistDataStep persistDataStep = + new PersistDataStep(snapSyncState, worldStateStorage, downloadState); + + @Before + public void setUp() { + final RunnableCounter runnableCounter = + new RunnableCounter(NoOpMetricsSystem.NO_OP_COUNTER, () -> {}, 100); + when(downloadState.getGeneratedNodes()).thenReturn(runnableCounter); + } + + @Test + public void shouldPersistDataWhenPresent() { + final List> tasks = TaskGenerator.createAccountRequest(true); + final List> result = persistDataStep.persist(tasks); + + assertThat(result).isSameAs(tasks); + + assertDataPersisted(tasks); + } + + @Test + public void shouldSkipPersistDataWhenNoData() { + final List> tasks = TaskGenerator.createAccountRequest(false); + final List> result = persistDataStep.persist(tasks); + + assertThat(result).isSameAs(tasks); + assertThat(worldStateStorage.getNodeData(Bytes.EMPTY, tasks.get(0).getData().getRootHash())) + .isEmpty(); + } + + private void assertDataPersisted(final List> tasks) { + tasks.forEach( + task -> { + if (task.getData() instanceof AccountRangeDataRequest) { + final AccountRangeDataRequest data = (AccountRangeDataRequest) task.getData(); + StoredMerklePatriciaTrie trie = + new StoredMerklePatriciaTrie<>( + worldStateStorage::getAccountStateTrieNode, data.getRootHash(), b -> b, b -> b); + data.getAccounts().forEach((key, value) -> assertThat(trie.get(key)).isPresent()); + } else if (task.getData() instanceof StorageRangeDataRequest) { + final StorageRangeDataRequest data = (StorageRangeDataRequest) task.getData(); + final StoredMerklePatriciaTrie trie = + new StoredMerklePatriciaTrie<>( + (location, hash) -> + worldStateStorage.getAccountStorageTrieNode( + Hash.wrap(data.getAccountHash()), location, hash), + data.getStorageRoot(), + b -> b, + b -> b); + data.getSlots().forEach((key, value) -> assertThat(trie.get(key)).isPresent()); + } else if (task.getData() instanceof BytecodeRequest) { + final BytecodeRequest data = (BytecodeRequest) task.getData(); + assertThat( + worldStateStorage.getCode(data.getCodeHash(), Hash.wrap(data.getAccountHash()))) + .isPresent(); + } else { + fail("not expected message"); + } + }); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StubTask.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StubTask.java new file mode 100644 index 000000000..6c5dd72f3 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StubTask.java @@ -0,0 +1,52 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.services.tasks.Task; + +public class StubTask implements Task { + + private final SnapDataRequest data; + private boolean completed = false; + private boolean failed = false; + + public StubTask(final SnapDataRequest data) { + this.data = data; + } + + @Override + public SnapDataRequest getData() { + return data; + } + + @Override + public void markCompleted() { + completed = true; + } + + @Override + public void markFailed() { + failed = true; + } + + public boolean isCompleted() { + return completed; + } + + public boolean isFailed() { + return failed; + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/TaskGenerator.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/TaskGenerator.java new file mode 100644 index 000000000..e7d0cac32 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/TaskGenerator.java @@ -0,0 +1,144 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.snapsync; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; +import org.hyperledger.besu.ethereum.core.TrieGenerator; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.StorageRangeDataRequest; +import org.hyperledger.besu.ethereum.rlp.RLP; +import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie; +import org.hyperledger.besu.ethereum.trie.RangeStorageEntriesCollector; +import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie; +import org.hyperledger.besu.ethereum.trie.TrieIterator; +import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; +import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.services.tasks.Task; + +import java.util.List; +import java.util.TreeMap; + +import kotlin.collections.ArrayDeque; +import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes32; + +public class TaskGenerator { + + public static List> createAccountRequest(final boolean withData) { + final WorldStateStorage worldStateStorage = + new InMemoryKeyValueStorageProvider().createWorldStateStorage(DataStorageFormat.FOREST); + final MerklePatriciaTrie trie = + TrieGenerator.generateTrie(worldStateStorage, 1); + final RangeStorageEntriesCollector collector = + RangeStorageEntriesCollector.createCollector( + Bytes32.ZERO, RangeManager.MAX_RANGE, 1, Integer.MAX_VALUE); + final TrieIterator visitor = RangeStorageEntriesCollector.createVisitor(collector); + final TreeMap accounts = + (TreeMap) + trie.entriesFrom( + root -> + RangeStorageEntriesCollector.collectEntries( + collector, visitor, root, Bytes32.ZERO)); + + final Hash rootHash = Hash.wrap(trie.getRootHash()); + + final AccountRangeDataRequest accountRangeDataRequest = + SnapDataRequest.createAccountRangeDataRequest( + rootHash, RangeManager.MIN_RANGE, RangeManager.MAX_RANGE); + if (withData) { + accountRangeDataRequest.setAccounts(accounts); + accountRangeDataRequest.setProofs(new ArrayDeque<>()); + } + + final StateTrieAccountValue stateTrieAccountValue = + StateTrieAccountValue.readFrom(RLP.input(accounts.firstEntry().getValue())); + final Hash accountHash = Hash.wrap(accounts.firstKey()); + + final StorageRangeDataRequest storageRangeDataRequest = + createStorageRangeDataRequest( + worldStateStorage, + rootHash, + accountHash, + stateTrieAccountValue.getStorageRoot(), + withData); + final BytecodeRequest bytecodeRequest = + createBytecodeDataRequest( + worldStateStorage, + rootHash, + accountHash, + stateTrieAccountValue.getCodeHash(), + withData); + + return List.of( + new StubTask(accountRangeDataRequest), + new StubTask(storageRangeDataRequest), + new StubTask(bytecodeRequest)); + } + + private static StorageRangeDataRequest createStorageRangeDataRequest( + final WorldStateStorage worldStateStorage, + final Hash rootHash, + final Hash accountHash, + final Bytes32 storageRoot, + final boolean withData) { + + final RangeStorageEntriesCollector collector = + RangeStorageEntriesCollector.createCollector( + Bytes32.ZERO, RangeManager.MAX_RANGE, 100, Integer.MAX_VALUE); + final StoredMerklePatriciaTrie storageTrie = + new StoredMerklePatriciaTrie<>( + (location, hash) -> + worldStateStorage.getAccountStorageTrieNode(accountHash, location, hash), + storageRoot, + b -> b, + b -> b); + + final TrieIterator visitor = RangeStorageEntriesCollector.createVisitor(collector); + final TreeMap slots = + (TreeMap) + storageTrie.entriesFrom( + root -> + RangeStorageEntriesCollector.collectEntries( + collector, visitor, root, Bytes32.ZERO)); + + final StorageRangeDataRequest request = + SnapDataRequest.createStorageRangeDataRequest( + rootHash, accountHash, storageRoot, RangeManager.MIN_RANGE, RangeManager.MAX_RANGE); + if (withData) { + request.setProofValid(true); + request.setSlots(slots); + request.setProofs(new ArrayDeque<>()); + } + return request; + } + + private static BytecodeRequest createBytecodeDataRequest( + final WorldStateStorage worldStateStorage, + final Hash rootHash, + final Hash accountHash, + final Hash codeHash, + final boolean withData) { + final BytecodeRequest request = + SnapDataRequest.createBytecodeRequest(accountHash, rootHash, codeHash); + if (withData) { + request.setCode(worldStateStorage.getCode(codeHash, accountHash).get()); + } + return request; + } +} diff --git a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/BatchingReadPipe.java b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/BatchingReadPipe.java index 0f2808efd..8c059d0d1 100644 --- a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/BatchingReadPipe.java +++ b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/BatchingReadPipe.java @@ -19,18 +19,29 @@ import org.hyperledger.besu.plugin.services.metrics.Counter; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.function.Function; public class BatchingReadPipe implements ReadPipe> { private final ReadPipe input; private final int maximumBatchSize; private final Counter batchCounter; + private final Function, Integer> stopBatchCondition; public BatchingReadPipe( final ReadPipe input, final int maximumBatchSize, final Counter batchCounter) { + this(input, maximumBatchSize, batchCounter, ts -> maximumBatchSize - ts.size()); + } + + public BatchingReadPipe( + final ReadPipe input, + final int maximumBatchSize, + final Counter batchCounter, + final Function, Integer> batchEndCondition) { this.input = input; this.maximumBatchSize = maximumBatchSize; this.batchCounter = batchCounter; + this.stopBatchCondition = batchEndCondition; } @Override @@ -53,7 +64,15 @@ public class BatchingReadPipe implements ReadPipe> { } final List batch = new ArrayList<>(); batch.add(firstItem); - input.drainTo(batch, maximumBatchSize - 1); + Integer remainingData = stopBatchCondition.apply(batch); + while (remainingData > 0 + && (batch.size() + remainingData) <= maximumBatchSize + && input.hasMore()) { + if (input.drainTo(batch, remainingData) == 0) { + break; + } + remainingData = stopBatchCondition.apply(batch); + } batchCounter.inc(); return batch; } @@ -61,7 +80,15 @@ public class BatchingReadPipe implements ReadPipe> { @Override public List poll() { final List batch = new ArrayList<>(); - input.drainTo(batch, maximumBatchSize); + Integer remainingData = stopBatchCondition.apply(batch); + while (remainingData > 0 + && (batch.size() + remainingData) <= maximumBatchSize + && input.hasMore()) { + if (input.drainTo(batch, remainingData) == 0) { + break; + } + remainingData = stopBatchCondition.apply(batch); + } if (batch.isEmpty()) { // Poll has to return null if the pipe is empty return null; @@ -71,10 +98,12 @@ public class BatchingReadPipe implements ReadPipe> { } @Override - public void drainTo(final Collection> output, final int maxElements) { + public int drainTo(final Collection> output, final int maxElements) { final List nextBatch = poll(); if (nextBatch != null) { output.add(nextBatch); + return nextBatch.size(); } + return 0; } } diff --git a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/Pipe.java b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/Pipe.java index 5e117d6e5..f8e85deb6 100644 --- a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/Pipe.java +++ b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/Pipe.java @@ -117,9 +117,10 @@ public class Pipe implements ReadPipe, WritePipe { } @Override - public void drainTo(final Collection output, final int maxElements) { + public int drainTo(final Collection output, final int maxElements) { final int count = queue.drainTo(output, maxElements); outputCounter.inc(count); + return count; } @Override diff --git a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java index 61dcdac88..67d7dd751 100644 --- a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java +++ b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java @@ -253,6 +253,36 @@ public class PipelineBuilder { pipelineName); } + /** + * Batches items into groups of at most maximumBatchSize. Batches are created eagerly to + * minimize delay so may not be full. + * + *

Order of items is preserved. + * + *

The output buffer size is reduced to bufferSize / maximumBatchSize + 1. + * + * @param maximumBatchSize the maximum number of items to include in a batch. + * @param stopBatchCondition the condition before ending the batch + * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. + */ + public PipelineBuilder> inBatches( + final int maximumBatchSize, final Function, Integer> stopBatchCondition) { + return new PipelineBuilder<>( + inputPipe, + stages, + pipes, + lastStageName, + new BatchingReadPipe<>( + pipeEnd, + maximumBatchSize, + outputCounter.labels(lastStageName + "_outputPipe", "batches"), + stopBatchCondition), + (int) Math.ceil(((double) bufferSize) / maximumBatchSize), + outputCounter, + tracingEnabled, + pipelineName); + } + /** * Adds a 1-to-many processing stage to the pipeline. For each item in the stream, mapper * is called and each item of the {@link Stream} it returns is output as an individual item. The diff --git a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/ReadPipe.java b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/ReadPipe.java index 8d174d34a..0e453decf 100644 --- a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/ReadPipe.java +++ b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/ReadPipe.java @@ -61,6 +61,7 @@ public interface ReadPipe { * * @param output the collection to transfer elements into * @param maxElements the maximum number of elements to transfer + * @return the number of elements drained in the pipe */ - void drainTo(Collection output, int maxElements); + int drainTo(Collection output, int maxElements); }