add snapsync as experimental feature (#3710)

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
This commit is contained in:
matkt
2022-04-14 15:55:15 +02:00
committed by GitHub
parent 918b5359b1
commit d5658c2a65
22 changed files with 237 additions and 245 deletions

View File

@@ -9,6 +9,7 @@
### Additions and Improvements
- Onchain node permissioning - log the enodeURL that was previously only throwing an IllegalStateException during the isPermitted check [#3697](https://github.com/hyperledger/besu/pull/3697)
- \[EXPERIMENTAL\] Add snapsync `--sync-mode="X_SNAP"` (only as client) [#3710](https://github.com/hyperledger/besu/pull/3710)
### Bug Fixes

View File

@@ -500,14 +500,13 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
blockchain, worldStateArchive, protocolSchedule, consensusContextFactory);
}
@SuppressWarnings("unused")
private Optional<SnapProtocolManager> createSnapProtocolManager(
final List<PeerValidator> peerValidators,
final EthPeers ethPeers,
final EthMessages snapMessages,
final WorldStateArchive worldStateArchive) {
// TODO implement method when flag will be available
return Optional.empty();
return Optional.of(
new SnapProtocolManager(peerValidators, ethPeers, snapMessages, worldStateArchive));
}
private WorldStateArchive createWorldStateArchive(

View File

@@ -24,6 +24,7 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.FullSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@@ -100,8 +101,7 @@ public class DefaultSynchronizer implements Synchronizer {
metricsSystem,
terminationCondition);
// TODO add this code when snapsync will be ready
/*if (SyncMode.X_SNAP.equals(syncConfig.getSyncMode())) {
if (SyncMode.X_SNAP.equals(syncConfig.getSyncMode())) {
this.fastSyncDownloader =
SnapDownloaderFactory.createSnapDownloader(
syncConfig,
@@ -125,19 +125,7 @@ public class DefaultSynchronizer implements Synchronizer {
worldStateStorage,
syncState,
clock);
}*/
this.fastSyncDownloader =
FastDownloaderFactory.create(
syncConfig,
dataDirectory,
protocolSchedule,
protocolContext,
metricsSystem,
ethContext,
worldStateStorage,
syncState,
clock);
}
metricsSystem.createLongGauge(
BesuMetricCategory.ETHEREUM,

View File

@@ -42,7 +42,7 @@ public class CompleteTaskStep {
public synchronized void markAsCompleteOrFailed(
final SnapWorldDownloadState downloadState, final Task<SnapDataRequest> task) {
if (task.getData().isValid()
if (task.getData().isResponseReceived()
|| (task.getData() instanceof TrieNodeDataRequest
&& task.getData().isExpired(snapSyncState))) {
completedRequestsCounter.inc();

View File

@@ -45,7 +45,7 @@ public class LoadLocalDataStep {
existingNodeCounter =
metricsSystem.createCounter(
BesuMetricCategory.SYNCHRONIZER,
"snap_world_state_existing_nodes_total",
"snap_world_state_existing_trie_nodes_total",
"Total number of node data requests completed using existing data");
this.snapSyncState = snapSyncState;
}

View File

@@ -40,7 +40,7 @@ public class PersistDataStep {
public List<Task<SnapDataRequest>> persist(final List<Task<SnapDataRequest>> tasks) {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
for (Task<SnapDataRequest> task : tasks) {
if (task.getData().isValid()) {
if (task.getData().isResponseReceived()) {
// enqueue child requests
final Stream<SnapDataRequest> childRequests =
task.getData().getChildRequests(downloadState, worldStateStorage, snapSyncState);

View File

@@ -87,10 +87,8 @@ public class RequestDataStep {
if (response != null) {
downloadState.removeOutstandingTask(getAccountTask);
accountDataRequest.setRootHash(blockHeader.getStateRoot());
accountDataRequest.setAccounts(response.accounts());
accountDataRequest.setProofs(response.proofs());
accountDataRequest.checkProof(
downloadState, worldStateProofProvider, fastSyncState);
accountDataRequest.addResponse(
worldStateProofProvider, response.accounts(), response.proofs());
}
return requestTask;
});
@@ -127,10 +125,11 @@ public class RequestDataStep {
final StorageRangeDataRequest request =
(StorageRangeDataRequest) requestTasks.get(i).getData();
request.setRootHash(blockHeader.getStateRoot());
request.setSlots(response.slots().get(i));
request.setProofs(
request.addResponse(
downloadState,
worldStateProofProvider,
response.slots().get(i),
i < response.slots().size() - 1 ? new ArrayDeque<>() : response.proofs());
request.checkProof(downloadState, worldStateProofProvider, fastSyncState);
}
}
return requestTasks;

View File

@@ -0,0 +1,37 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import org.hyperledger.besu.metrics.RunnableCounter;
import org.hyperledger.besu.plugin.services.metrics.Counter;
public class SnapCounter extends RunnableCounter {
long oldRunValue = 0;
public SnapCounter(final Counter backedCounter, final Runnable task, final int step) {
super(backedCounter, task, step);
}
@Override
public void inc(final long amount) {
backedCounter.inc(amount);
long value = stepCounter.addAndGet(amount);
if ((value - oldRunValue) >= step) {
oldRunValue = value;
task.run();
}
}
}

View File

@@ -30,7 +30,6 @@ import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
@@ -88,7 +87,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
worldStateStorage.clear();
final InMemoryTasksPriorityQueues<SnapDataRequest> snapTaskCollection =
createSnapWorldStateDownloaderTaskCollection(metricsSystem);
createSnapWorldStateDownloaderTaskCollection();
final WorldStateDownloader snapWorldStateDownloader =
new SnapWorldStateDownloader(
ethContext,
@@ -121,15 +120,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
}
private static InMemoryTasksPriorityQueues<SnapDataRequest>
createSnapWorldStateDownloaderTaskCollection(final MetricsSystem metricsSystem) {
final InMemoryTasksPriorityQueues<SnapDataRequest> taskCollection =
new InMemoryTasksPriorityQueues<>();
metricsSystem.createLongGauge(
BesuMetricCategory.SYNCHRONIZER,
"snap_world_state_pending_requests_current",
"Number of pending requests for snap sync world state download",
taskCollection::size);
return taskCollection;
createSnapWorldStateDownloaderTaskCollection() {
return new InMemoryTasksPriorityQueues<>();
}
}

View File

@@ -46,7 +46,8 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
private static final Logger LOG = LoggerFactory.getLogger(SnapWorldDownloadState.class);
private static final int DISPLAY_PROGRESS_STEP = 100000;
private static final int DISPLAY_SNAP_PROGRESS_STEP = 200000;
private static final int DISPLAY_HEAL_PROGRESS_STEP = 10000;
protected final InMemoryTaskQueue<SnapDataRequest> pendingAccountRequests =
new InMemoryTaskQueue<>();
@@ -62,7 +63,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
private final SnapSyncState snapSyncState;
// metrics round the snapsync
// metrics around the snapsync
private final RunnableCounter generatedNodes;
private final RunnableCounter healedNodes;
@@ -82,21 +83,46 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
clock);
this.snapSyncState = snapSyncState;
this.generatedNodes =
new RunnableCounter(
new SnapCounter(
metricsSystem.createCounter(
BesuMetricCategory.SYNCHRONIZER,
"snapsync_world_state_generated_nodes_total",
"snap_world_state_generated_nodes_total",
"Total number of data nodes generated as part of snap sync world state download"),
this::displayWorldStateSyncProgress,
DISPLAY_PROGRESS_STEP);
DISPLAY_SNAP_PROGRESS_STEP);
this.healedNodes =
new RunnableCounter(
new SnapCounter(
metricsSystem.createCounter(
BesuMetricCategory.SYNCHRONIZER,
"snapsync_world_state_healed_nodes_total",
"snap_world_state_healed_nodes_total",
"Total number of data nodes healed as part of snap sync world state heal process"),
this::displayHealProgress,
DISPLAY_PROGRESS_STEP);
DISPLAY_HEAL_PROGRESS_STEP);
metricsSystem.createLongGauge(
BesuMetricCategory.SYNCHRONIZER,
"snap_world_state_pending_account_requests_current",
"Number of account pending requests for snap sync world state download",
pendingAccountRequests::size);
metricsSystem.createLongGauge(
BesuMetricCategory.SYNCHRONIZER,
"snap_world_state_pending_storage_requests_current",
"Number of storage pending requests for snap sync world state download",
pendingStorageRequests::size);
metricsSystem.createLongGauge(
BesuMetricCategory.SYNCHRONIZER,
"snap_world_state_pending_big_storage_requests_current",
"Number of storage pending requests for snap sync world state download",
pendingBigStorageRequests::size);
metricsSystem.createLongGauge(
BesuMetricCategory.SYNCHRONIZER,
"snap_world_state_pending_code_requests_current",
"Number of code pending requests for snap sync world state download",
pendingCodeRequests::size);
metricsSystem.createLongGauge(
BesuMetricCategory.SYNCHRONIZER,
"snap_world_state_pending_trie_node_requests_current",
"Number of trie node pending requests for snap sync world state download",
pendingTrieNodeRequests::size);
}
@Override
@@ -218,7 +244,8 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
public synchronized Task<SnapDataRequest> dequeueAccountRequestBlocking() {
return dequeueRequestBlocking(
List.of(pendingStorageRequests, pendingCodeRequests), List.of(pendingAccountRequests));
List.of(pendingStorageRequests, pendingBigStorageRequests, pendingCodeRequests),
List.of(pendingAccountRequests));
}
public synchronized Task<SnapDataRequest> dequeueBigStorageRequestBlocking() {
@@ -244,6 +271,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
pendingTrieNodeRequests.clearInternalQueues();
pendingCodeRequests.clearInternalQueue();
snapSyncState.setHealStatus(false);
snapSyncState.getPivotBlockHeader().ifPresent(this::checkCompletion);
}
public RunnableCounter getGeneratedNodes() {
@@ -255,7 +283,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
}
private void displayWorldStateSyncProgress() {
LOG.info("Generated {} world state nodes", generatedNodes.get());
LOG.info("Retrieved {} world state nodes", generatedNodes.get());
}
private void displayHealProgress() {

View File

@@ -235,11 +235,11 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
return tasks;
})
.thenProcessAsync(
"batchDownloadData",
"batchDownloadAccountData",
requestTask -> requestDataStep.requestAccount(requestTask),
maxOutstandingRequests)
.thenProcess("batchPersistData", task -> persistDataStep.persist(task))
.andFinishWith("batchDataDownloaded", requestsToComplete::put);
.thenProcess("batchPersistAccountData", task -> persistDataStep.persist(task))
.andFinishWith("batchAccountDataDownloaded", requestsToComplete::put);
final Pipeline<Task<SnapDataRequest>> fetchStorageDataPipeline =
createPipelineFrom(
@@ -258,12 +258,12 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
return tasks;
})
.thenProcessAsyncOrdered(
"batchDownloadData",
"batchDownloadStorageData",
requestTask -> requestDataStep.requestStorage(requestTask),
maxOutstandingRequests)
.thenProcess("batchPersistData", task -> persistDataStep.persist(task))
.thenProcess("batchPersistStorageData", task -> persistDataStep.persist(task))
.andFinishWith(
"batchDataDownloaded",
"batchStorageDataDownloaded",
tasks -> {
tasks.forEach(requestsToComplete::put);
});
@@ -284,17 +284,17 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
return tasks;
})
.thenProcessAsyncOrdered(
"batchDownloadData",
"batchDownloadBigStorageData",
requestTask -> requestDataStep.requestStorage(List.of(requestTask)),
maxOutstandingRequests)
.thenProcess(
"batchPersistData",
"batchPersistBigStorageData",
task -> {
persistDataStep.persist(task);
return task;
})
.andFinishWith(
"batchDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put));
"batchBigStorageDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put));
final Pipeline<Task<SnapDataRequest>> fetchCodePipeline =
createPipelineFrom(
@@ -326,21 +326,21 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
return tasks;
})
.thenProcessAsyncOrdered(
"batchDownloadCodeBlocksData",
"batchDownloadCodeData",
tasks -> requestDataStep.requestCode(tasks),
maxOutstandingRequests)
.thenProcess(
"batchPersistData",
"batchPersistCodeData",
tasks -> {
persistDataStep.persist(tasks);
return tasks;
})
.andFinishWith(
"batchDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put));
"batchCodeDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put));
final Pipeline<Task<SnapDataRequest>> fetchHealDataPipeline =
createPipelineFrom(
"requestDequeued",
"requestTrieNodeDequeued",
new TaskQueueIterator<>(
downloadState, () -> downloadState.dequeueTrieNodeRequestBlocking()),
bufferCapacity,
@@ -348,7 +348,7 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
true,
"world_state_download")
.thenFlatMapInParallel(
"requestLoadLocalData",
"requestLoadLocalTrieNodeData",
task -> loadLocalDataStep.loadLocalDataTrieNode(task, requestsToComplete),
3,
bufferCapacity)
@@ -363,17 +363,17 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
return tasks;
})
.thenProcessAsync(
"batchDownloadData",
"batchDownloadTrieNodeData",
tasks -> requestDataStep.requestTrieNodeByPath(tasks),
maxOutstandingRequests)
.thenProcess(
"batchPersistData",
"batchPersistTrieNodeData",
tasks -> {
persistDataStep.persist(tasks);
return tasks;
})
.andFinishWith(
"batchDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put));
"batchTrieNodeDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put));
return new SnapWorldStateDownloadProcess(
fetchAccountDataPipeline,

View File

@@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -34,6 +35,7 @@ import java.util.function.Function;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.immutables.value.Value;
public class StackTrie {
@@ -41,8 +43,11 @@ public class StackTrie {
private final AtomicInteger nbSegments;
private final int maxSegments;
private final Bytes32 startKeyHash;
private final List<Bytes> proofs;
private final TreeMap<Bytes32, Bytes> keys;
private final Map<Bytes32, TaskElement> elements;
public StackTrie(final Hash rootHash, final Bytes32 startKeyHash) {
this(rootHash, 1, 1, startKeyHash);
}
public StackTrie(
final Hash rootHash,
@@ -53,20 +58,34 @@ public class StackTrie {
this.nbSegments = new AtomicInteger(nbSegments);
this.maxSegments = maxSegments;
this.startKeyHash = startKeyHash;
this.proofs = new ArrayList<>();
this.keys = new TreeMap<>();
this.elements = new LinkedHashMap<>();
}
public void addKeys(final TreeMap<Bytes32, Bytes> keys) {
this.keys.putAll(keys);
public void addElement(
final Bytes32 taskIdentifier, final List<Bytes> proofs, final TreeMap<Bytes32, Bytes> keys) {
this.elements.put(
taskIdentifier, ImmutableTaskElement.builder().proofs(proofs).keys(keys).build());
}
public void addProofs(final List<Bytes> proofs) {
this.proofs.addAll(proofs);
public TaskElement getElement(final Bytes32 taskIdentifier) {
return this.elements.get(taskIdentifier);
}
public void commit(final NodeUpdater nodeUpdater) {
if (nbSegments.decrementAndGet() <= 0 && (!proofs.isEmpty() || !keys.isEmpty())) {
if (nbSegments.decrementAndGet() <= 0 && !elements.isEmpty()) {
final List<Bytes> proofs = new ArrayList<>();
final TreeMap<Bytes32, Bytes> keys = new TreeMap<>();
elements
.values()
.forEach(
taskElement -> {
proofs.addAll(taskElement.proofs());
keys.putAll(taskElement.keys());
});
final Map<Bytes32, Bytes> proofsEntries = new HashMap<>();
for (Bytes proof : proofs) {
proofsEntries.put(Hash.hash(proof), proof);
@@ -82,7 +101,9 @@ public class StackTrie {
true);
final MerklePatriciaTrie<Bytes, Bytes> trie =
new StoredMerklePatriciaTrie<>(snapStoredNodeFactory, rootHash);
new StoredMerklePatriciaTrie<>(
snapStoredNodeFactory,
proofs.isEmpty() ? MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH : rootHash);
for (Map.Entry<Bytes32, Bytes> account : keys.entrySet()) {
trie.put(account.getKey(), new SnapPutVisitor<>(snapStoredNodeFactory, account.getValue()));
@@ -108,4 +129,18 @@ public class StackTrie {
return true;
}
}
@Value.Immutable
public abstract static class TaskElement {
@Value.Default
public List<Bytes> proofs() {
return new ArrayList<>();
}
@Value.Default
public TreeMap<Bytes32, Bytes> keys() {
return new TreeMap<>();
}
}
}

View File

@@ -22,30 +22,24 @@ import static org.hyperledger.besu.ethereum.eth.sync.snapsync.RequestType.ACCOUN
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.StackTrie;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.trie.CommitVisitor;
import org.hyperledger.besu.ethereum.trie.InnerNodeDiscoveryManager;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.Node;
import org.hyperledger.besu.ethereum.trie.NodeUpdater;
import org.hyperledger.besu.ethereum.trie.SnapPutVisitor;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import kotlin.collections.ArrayDeque;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
@@ -59,13 +53,12 @@ public class AccountRangeDataRequest extends SnapDataRequest {
protected final Bytes32 startKeyHash;
protected final Bytes32 endKeyHash;
protected TreeMap<Bytes32, Bytes> accounts;
protected ArrayDeque<Bytes> proofs;
private final Optional<Bytes32> startStorageRange;
private final Optional<Bytes32> endStorageRange;
private final StackTrie stackTrie;
private Optional<Boolean> isProofValid;
protected AccountRangeDataRequest(
final Hash rootHash,
final Bytes32 startKeyHash,
@@ -75,10 +68,10 @@ public class AccountRangeDataRequest extends SnapDataRequest {
super(ACCOUNT_RANGE, rootHash);
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
this.accounts = new TreeMap<>();
this.proofs = new ArrayDeque<>();
this.startStorageRange = startStorageRange;
this.endStorageRange = endStorageRange;
this.isProofValid = Optional.empty();
this.stackTrie = new StackTrie(rootHash, startKeyHash);
LOG.trace(
"create get account range data request with root hash={} from {} to {}",
rootHash,
@@ -117,30 +110,6 @@ public class AccountRangeDataRequest extends SnapDataRequest {
return 0;
}
final Bytes32 storageRoot =
proofs.isEmpty() ? MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH : getRootHash();
final Map<Bytes32, Bytes> proofsEntries = new HashMap<>();
for (Bytes proof : proofs) {
proofsEntries.put(Hash.hash(proof), proof);
}
final InnerNodeDiscoveryManager<Bytes> snapStoredNodeFactory =
new InnerNodeDiscoveryManager<>(
(location, hash) -> Optional.ofNullable(proofsEntries.get(hash)),
Function.identity(),
Function.identity(),
startKeyHash,
accounts.lastKey(),
true);
final MerklePatriciaTrie<Bytes, Bytes> trie =
new StoredMerklePatriciaTrie<>(snapStoredNodeFactory, storageRoot);
for (Map.Entry<Bytes32, Bytes> account : accounts.entrySet()) {
trie.put(account.getKey(), new SnapPutVisitor<>(snapStoredNodeFactory, account.getValue()));
}
// search incomplete nodes in the range
final AtomicInteger nbNodesSaved = new AtomicInteger();
final NodeUpdater nodeUpdater =
@@ -149,38 +118,29 @@ public class AccountRangeDataRequest extends SnapDataRequest {
nbNodesSaved.getAndIncrement();
};
trie.commit(
nodeUpdater,
(new CommitVisitor<>(nodeUpdater) {
@Override
public void maybeStoreNode(final Bytes location, final Node<Bytes> node) {
if (!node.isHealNeeded()) {
super.maybeStoreNode(location, node);
}
}
}));
stackTrie.commit(nodeUpdater);
return nbNodesSaved.get();
}
@Override
public boolean checkProof(
final WorldDownloadState<SnapDataRequest> downloadState,
public void addResponse(
final WorldStateProofProvider worldStateProofProvider,
final SnapSyncState snapSyncState) {
// validate the range proof
if (!worldStateProofProvider.isValidRangeProof(
startKeyHash, endKeyHash, getRootHash(), proofs, accounts)) {
clear();
return false;
final TreeMap<Bytes32, Bytes> accounts,
final ArrayDeque<Bytes> proofs) {
if (!accounts.isEmpty() || !proofs.isEmpty()) {
if (!worldStateProofProvider.isValidRangeProof(
startKeyHash, endKeyHash, getRootHash(), proofs, accounts)) {
isProofValid = Optional.of(false);
} else {
stackTrie.addElement(startKeyHash, proofs, accounts);
isProofValid = Optional.of(true);
}
}
return true;
}
@Override
public boolean isValid() {
return !accounts.isEmpty() || !proofs.isEmpty();
public boolean isResponseReceived() {
return isProofValid.orElse(false);
}
@Override
@@ -190,15 +150,16 @@ public class AccountRangeDataRequest extends SnapDataRequest {
final SnapSyncState snapSyncState) {
final List<SnapDataRequest> childRequests = new ArrayList<>();
final StackTrie.TaskElement taskElement = stackTrie.getElement(startKeyHash);
// new request is added if the response does not match all the requested range
findNewBeginElementInRange(getRootHash(), proofs, accounts, endKeyHash)
findNewBeginElementInRange(getRootHash(), taskElement.proofs(), taskElement.keys(), endKeyHash)
.ifPresent(
missingRightElement ->
childRequests.add(
createAccountRangeDataRequest(getRootHash(), missingRightElement, endKeyHash)));
// find missing storages and code
for (Map.Entry<Bytes32, Bytes> account : accounts.entrySet()) {
for (Map.Entry<Bytes32, Bytes> account : taskElement.keys().entrySet()) {
final StateTrieAccountValue accountValue =
StateTrieAccountValue.readFrom(RLP.input(account.getValue()));
if (!accountValue.getStorageRoot().equals(Hash.EMPTY_TRIE_HASH)) {
@@ -226,25 +187,8 @@ public class AccountRangeDataRequest extends SnapDataRequest {
return endKeyHash;
}
@VisibleForTesting
public TreeMap<Bytes32, Bytes> getAccounts() {
return accounts;
}
public void setAccounts(final TreeMap<Bytes32, Bytes> accounts) {
this.accounts = accounts;
}
public ArrayDeque<Bytes> getProofs() {
return proofs;
}
public void setProofs(final ArrayDeque<Bytes> proofs) {
this.proofs = proofs;
}
@Override
public void clear() {
accounts = new TreeMap<>();
proofs = new ArrayDeque<>();
return stackTrie.getElement(startKeyHash).keys();
}
}

View File

@@ -21,7 +21,6 @@ import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater;
@@ -67,15 +66,7 @@ public class BytecodeRequest extends SnapDataRequest {
}
@Override
public boolean checkProof(
final WorldDownloadState<SnapDataRequest> downloadState,
final WorldStateProofProvider worldStateProofProvider,
final SnapSyncState snapSyncState) {
return true;
}
@Override
public boolean isValid() {
public boolean isResponseReceived() {
return !code.isEmpty();
}

View File

@@ -22,7 +22,6 @@ import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloaderException;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.services.tasks.TasksPriorityProvider;
@@ -111,12 +110,7 @@ public abstract class SnapDataRequest implements TasksPriorityProvider {
final WorldDownloadState<SnapDataRequest> downloadState,
final SnapSyncState snapSyncState);
public abstract boolean checkProof(
final WorldDownloadState<SnapDataRequest> downloadState,
final WorldStateProofProvider worldStateProofProvider,
SnapSyncState snapSyncState);
public abstract boolean isValid();
public abstract boolean isResponseReceived();
public boolean isExpired(final SnapSyncState snapSyncState) {
return false;

View File

@@ -51,15 +51,13 @@ public class StorageRangeDataRequest extends SnapDataRequest {
private static final Logger LOG = LoggerFactory.getLogger(StorageRangeDataRequest.class);
private final Bytes32 accountHash;
private final Hash accountHash;
private final Bytes32 storageRoot;
private final Bytes32 startKeyHash;
private final Bytes32 endKeyHash;
private StackTrie stackTrie;
private TreeMap<Bytes32, Bytes> slots;
private ArrayDeque<Bytes> proofs;
private boolean isProofValid;
private Optional<Boolean> isProofValid;
protected StorageRangeDataRequest(
final Hash rootHash,
@@ -68,12 +66,11 @@ public class StorageRangeDataRequest extends SnapDataRequest {
final Bytes32 startKeyHash,
final Bytes32 endKeyHash) {
super(STORAGE_RANGE, rootHash);
this.accountHash = accountHash;
this.accountHash = Hash.wrap(accountHash);
this.storageRoot = storageRoot;
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
this.proofs = new ArrayDeque<>();
this.slots = new TreeMap<>();
this.isProofValid = Optional.empty();
addStackTrie(Optional.empty());
LOG.trace(
"create get storage range data request for account {} with root hash={} from {} to {}",
@@ -90,23 +87,18 @@ public class StorageRangeDataRequest extends SnapDataRequest {
final WorldDownloadState<SnapDataRequest> downloadState,
final SnapSyncState snapSyncState) {
if (isProofValid) {
stackTrie.addKeys(slots);
stackTrie.addProofs(proofs);
}
// search incomplete nodes in the range
final AtomicInteger nbNodesSaved = new AtomicInteger();
final AtomicReference<Updater> updaterTmp = new AtomicReference<>(worldStateStorage.updater());
final NodeUpdater nodeUpdater =
(location, hash, value) -> {
// create small batch in order to commit small amount of nodes at the same time
updaterTmp.get().putAccountStorageTrieNode(Hash.wrap(accountHash), location, hash, value);
updaterTmp.get().putAccountStorageTrieNode(accountHash, location, hash, value);
if (nbNodesSaved.getAndIncrement() % 1000 == 0) {
updaterTmp.get().commit();
updaterTmp.set(worldStateStorage.updater());
}
};
stackTrie.commit(nodeUpdater);
updaterTmp.get().commit();
@@ -114,26 +106,28 @@ public class StorageRangeDataRequest extends SnapDataRequest {
return nbNodesSaved.get();
}
@Override
public boolean checkProof(
public void addResponse(
final WorldDownloadState<SnapDataRequest> downloadState,
final WorldStateProofProvider worldStateProofProvider,
final SnapSyncState snapSyncState) {
if (!worldStateProofProvider.isValidRangeProof(
startKeyHash, endKeyHash, storageRoot, proofs, slots)) {
downloadState.enqueueRequest(
createAccountDataRequest(
getRootHash(), Hash.wrap(accountHash), startKeyHash, endKeyHash));
isProofValid = false;
} else {
isProofValid = true;
final TreeMap<Bytes32, Bytes> slots,
final ArrayDeque<Bytes> proofs) {
if (!slots.isEmpty() || !proofs.isEmpty()) {
if (!worldStateProofProvider.isValidRangeProof(
startKeyHash, endKeyHash, storageRoot, proofs, slots)) {
downloadState.enqueueRequest(
createAccountDataRequest(
getRootHash(), Hash.wrap(accountHash), startKeyHash, endKeyHash));
isProofValid = Optional.of(false);
} else {
stackTrie.addElement(startKeyHash, proofs, slots);
isProofValid = Optional.of(true);
}
}
return isProofValid;
}
@Override
public boolean isValid() {
return !slots.isEmpty() || !proofs.isEmpty();
public boolean isResponseReceived() {
return isProofValid.isPresent();
}
@Override
@@ -148,14 +142,16 @@ public class StorageRangeDataRequest extends SnapDataRequest {
final SnapSyncState snapSyncState) {
final List<SnapDataRequest> childRequests = new ArrayList<>();
if (!isProofValid) {
if (!isProofValid.orElse(false)) {
return Stream.empty();
}
findNewBeginElementInRange(storageRoot, proofs, slots, endKeyHash)
final StackTrie.TaskElement taskElement = stackTrie.getElement(startKeyHash);
findNewBeginElementInRange(storageRoot, taskElement.proofs(), taskElement.keys(), endKeyHash)
.ifPresent(
missingRightElement -> {
final int nbRanges = findNbRanges();
final int nbRanges = findNbRanges(taskElement.keys());
RangeManager.generateRanges(missingRightElement, endKeyHash, nbRanges)
.forEach(
(key, value) -> {
@@ -176,7 +172,7 @@ public class StorageRangeDataRequest extends SnapDataRequest {
return childRequests.stream();
}
private int findNbRanges() {
private int findNbRanges(final TreeMap<Bytes32, Bytes> slots) {
if (startKeyHash.equals(MIN_RANGE) && endKeyHash.equals(MAX_RANGE)) {
return MAX_RANGE
.toUnsignedBigInteger()
@@ -196,7 +192,7 @@ public class StorageRangeDataRequest extends SnapDataRequest {
}
public TreeMap<Bytes32, Bytes> getSlots() {
return slots;
return stackTrie.getElement(startKeyHash).keys();
}
public Bytes32 getStartKeyHash() {
@@ -207,17 +203,9 @@ public class StorageRangeDataRequest extends SnapDataRequest {
return endKeyHash;
}
public void setProofs(final ArrayDeque<Bytes> proofs) {
this.proofs = proofs;
}
public void setSlots(final TreeMap<Bytes32, Bytes> slots) {
this.slots = slots;
}
@VisibleForTesting
public void setProofValid(final boolean proofValid) {
isProofValid = proofValid;
public void setProofValid(final boolean isProofValid) {
this.isProofValid = Optional.of(isProofValid);
}
public void addStackTrie(final Optional<StackTrie> maybeStackTrie) {

View File

@@ -21,7 +21,6 @@ import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.trie.Node;
import org.hyperledger.besu.ethereum.trie.TrieNodeDecoder;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
@@ -58,7 +57,7 @@ public abstract class TrieNodeDataRequest extends SnapDataRequest implements Tas
final WorldStateStorage.Updater updater,
final WorldDownloadState<SnapDataRequest> downloadState,
final SnapSyncState snapSyncState) {
if (!isValid() || isExpired(snapSyncState) || pendingChildren.get() > 0) {
if (isExpired(snapSyncState) || pendingChildren.get() > 0) {
// we do nothing. Our last child will eventually persist us.
return 0;
}
@@ -81,7 +80,7 @@ public abstract class TrieNodeDataRequest extends SnapDataRequest implements Tas
final SnapWorldDownloadState downloadState,
final WorldStateStorage worldStateStorage,
final SnapSyncState snapSyncState) {
if (!isValid()) {
if (!isResponseReceived()) {
// If this node hasn't been downloaded yet, we can't return any child data
return Stream.empty();
}
@@ -109,20 +108,12 @@ public abstract class TrieNodeDataRequest extends SnapDataRequest implements Tas
.peek(request -> request.registerParent(this));
}
@Override
public boolean checkProof(
final WorldDownloadState<SnapDataRequest> downloadState,
final WorldStateProofProvider worldStateProofProvider,
final SnapSyncState snapSyncState) {
return true;
}
public boolean isRoot() {
return possibleParent.isEmpty();
}
@Override
public boolean isValid() {
public boolean isResponseReceived() {
return !data.isEmpty() && Hash.hash(data).equals(getNodeHash());
}

View File

@@ -89,7 +89,7 @@ public class LoadLocalDataStepTest {
loadLocalDataStep.loadLocalDataTrieNode(task, completedTasks);
assertThat(completedTasks.poll()).isSameAs(task);
assertThat(request.isValid()).isTrue();
assertThat(request.isResponseReceived()).isTrue();
assertThat(output).isEmpty();
verify(updater).commit();

View File

@@ -76,8 +76,8 @@ public class StackTrieTest {
worldStateProofProvider.getAccountProofRelatedNodes(
Hash.wrap(accountStateTrie.getRootHash()), accounts.lastKey()));
stackTrie.addKeys(accounts);
stackTrie.addProofs(proofs);
stackTrie.addElement(Bytes32.random(), proofs, accounts);
final WorldStateStorage.Updater updater = recreatedWorldStateStorage.updater();
stackTrie.commit(updater::putAccountStateTrieNode);
updater.commit();
@@ -129,8 +129,8 @@ public class StackTrieTest {
worldStateProofProvider.getAccountProofRelatedNodes(
Hash.wrap(accountStateTrie.getRootHash()), accounts.lastKey()));
stackTrie.addKeys(accounts);
stackTrie.addProofs(proofs);
stackTrie.addElement(Bytes32.random(), proofs, accounts);
final WorldStateStorage.Updater updater = recreatedWorldStateStorage.updater();
stackTrie.commit(updater::putAccountStateTrieNode);
updater.commit();

View File

@@ -21,6 +21,7 @@ import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataR
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.StorageRangeDataRequest;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.RangeStorageEntriesCollector;
@@ -41,8 +42,13 @@ import org.apache.tuweni.bytes.Bytes32;
public class TaskGenerator {
public static List<Task<SnapDataRequest>> createAccountRequest(final boolean withData) {
final WorldStateStorage worldStateStorage =
new InMemoryKeyValueStorageProvider().createWorldStateStorage(DataStorageFormat.FOREST);
final WorldStateProofProvider worldStateProofProvider =
new WorldStateProofProvider(worldStateStorage);
final MerklePatriciaTrie<Bytes32, Bytes> trie =
TrieGenerator.generateTrie(worldStateStorage, 1);
final RangeStorageEntriesCollector collector =
@@ -62,8 +68,7 @@ public class TaskGenerator {
SnapDataRequest.createAccountRangeDataRequest(
rootHash, RangeManager.MIN_RANGE, RangeManager.MAX_RANGE);
if (withData) {
accountRangeDataRequest.setAccounts(accounts);
accountRangeDataRequest.setProofs(new ArrayDeque<>());
accountRangeDataRequest.addResponse(worldStateProofProvider, accounts, new ArrayDeque<>());
}
final StateTrieAccountValue stateTrieAccountValue =
@@ -72,6 +77,7 @@ public class TaskGenerator {
final StorageRangeDataRequest storageRangeDataRequest =
createStorageRangeDataRequest(
worldStateProofProvider,
worldStateStorage,
rootHash,
accountHash,
@@ -92,6 +98,7 @@ public class TaskGenerator {
}
private static StorageRangeDataRequest createStorageRangeDataRequest(
final WorldStateProofProvider worldStateProofProvider,
final WorldStateStorage worldStateStorage,
final Hash rootHash,
final Hash accountHash,
@@ -122,8 +129,7 @@ public class TaskGenerator {
rootHash, accountHash, storageRoot, RangeManager.MIN_RANGE, RangeManager.MAX_RANGE);
if (withData) {
request.setProofValid(true);
request.setSlots(slots);
request.setProofs(new ArrayDeque<>());
request.addResponse(null, worldStateProofProvider, slots, new ArrayDeque<>());
}
return request;
}

View File

@@ -21,10 +21,10 @@ import java.util.concurrent.atomic.AtomicLong;
/** Counter that triggers a specific task each time a step is hit. */
public class RunnableCounter implements Counter {
private final Counter backedCounter;
private final Runnable task;
private final int step;
private final AtomicLong stepCounter;
protected final Counter backedCounter;
protected final Runnable task;
protected final int step;
protected final AtomicLong stepCounter;
public RunnableCounter(final Counter backedCounter, final Runnable task, final int step) {
this.backedCounter = backedCounter;

View File

@@ -34,7 +34,7 @@ public class InMemoryTasksPriorityQueues<T extends TasksPriorityProvider>
clearInternalQueues();
}
public void clearInternalQueues() {
public synchronized void clearInternalQueues() {
internalQueues.clear();
for (int i = 0; i < 16; i++) {
internalQueues.add(newEmptyQueue());