preparing the snapsync pipeline (#3625)

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
This commit is contained in:
matkt
2022-03-29 11:03:57 +02:00
committed by GitHub
parent abf9b0d057
commit 7e0d06ac61
41 changed files with 2445 additions and 44 deletions

View File

@@ -149,6 +149,12 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage {
trieLogStorage.clear();
}
@Override
public void clearFlatDatabase() {
accountStorage.clear();
storageStorage.clear();
}
@Override
public Updater updater() {
return new Updater(

View File

@@ -97,6 +97,11 @@ public class WorldStateKeyValueStorage implements WorldStateStorage {
keyValueStorage.clear();
}
@Override
public void clearFlatDatabase() {
// nothing to do for forest
}
@Override
public Updater updater() {
return new Updater(lock, keyValueStorage.startTransaction(), nodeAddedListeners);

View File

@@ -44,6 +44,8 @@ public interface WorldStateStorage {
void clear();
void clearFlatDatabase();
Updater updater();
long prune(Predicate<byte[]> inUseCheck);

View File

@@ -16,7 +16,6 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.RunnableCounter;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@@ -31,17 +30,14 @@ import org.slf4j.LoggerFactory;
public class CompleteTaskStep {
private static final Logger LOG = LoggerFactory.getLogger(CompleteTaskStep.class);
private static final int DISPLAY_PROGRESS_STEP = 100000;
private final WorldStateStorage worldStateStorage;
private final RunnableCounter completedRequestsCounter;
private final Counter retriedRequestsCounter;
private final LongSupplier worldStatePendingRequestsCurrentSupplier;
private long lastLogAt = System.currentTimeMillis();
public CompleteTaskStep(
final WorldStateStorage worldStateStorage,
final MetricsSystem metricsSystem,
final LongSupplier worldStatePendingRequestsCurrentSupplier) {
this.worldStateStorage = worldStateStorage;
this.worldStatePendingRequestsCurrentSupplier = worldStatePendingRequestsCurrentSupplier;
completedRequestsCounter =
new RunnableCounter(
@@ -65,7 +61,7 @@ public class CompleteTaskStep {
if (task.getData().getData() != null) {
completedRequestsCounter.inc();
task.markCompleted();
downloadState.checkCompletion(worldStateStorage, header);
downloadState.checkCompletion(header);
} else {
retriedRequestsCounter.inc();
task.markFailed();

View File

@@ -31,16 +31,21 @@ public class FastWorldDownloadState extends WorldDownloadState<NodeDataRequest>
private static final Logger LOG = LoggerFactory.getLogger(FastWorldDownloadState.class);
public FastWorldDownloadState(
final WorldStateStorage worldStateStorage,
final InMemoryTasksPriorityQueues<NodeDataRequest> pendingRequests,
final int maxRequestsWithoutProgress,
final long minMillisBeforeStalling,
final Clock clock) {
super(pendingRequests, maxRequestsWithoutProgress, minMillisBeforeStalling, clock);
super(
worldStateStorage,
pendingRequests,
maxRequestsWithoutProgress,
minMillisBeforeStalling,
clock);
}
@Override
public synchronized boolean checkCompletion(
final WorldStateStorage worldStateStorage, final BlockHeader header) {
public synchronized boolean checkCompletion(final BlockHeader header) {
if (!internalFuture.isDone() && pendingRequests.allTasksCompleted()) {
if (rootNodeData == null) {
enqueueRequest(

View File

@@ -134,7 +134,11 @@ public class FastWorldStateDownloader implements WorldStateDownloader {
final FastWorldDownloadState newDownloadState =
new FastWorldDownloadState(
taskCollection, maxNodeRequestsWithoutProgress, minMillisBeforeStalling, clock);
worldStateStorage,
taskCollection,
maxNodeRequestsWithoutProgress,
minMillisBeforeStalling,
clock);
this.downloadState.set(newDownloadState);
if (!newDownloadState.downloadWasResumed()) {
@@ -143,8 +147,7 @@ public class FastWorldStateDownloader implements WorldStateDownloader {
NodeDataRequest.createAccountDataRequest(stateRoot, Optional.of(Bytes.EMPTY)));
}
maybeCompleteTask =
Optional.of(new CompleteTaskStep(worldStateStorage, metricsSystem, taskCollection::size));
maybeCompleteTask = Optional.of(new CompleteTaskStep(metricsSystem, taskCollection::size));
final FastWorldStateDownloadProcess downloadProcess =
FastWorldStateDownloadProcess.builder()
.hashCountPerRequest(hashCountPerRequest)

View File

@@ -36,8 +36,7 @@ import org.apache.tuweni.bytes.Bytes32;
*/
public class RangeManager {
public static final Hash MIN_RANGE =
Hash.fromHexString("0x0000000000000000000000000000000000000000000000000000000000000000");
public static final Hash MIN_RANGE = Hash.wrap(Bytes32.ZERO);
public static final Hash MAX_RANGE =
Hash.fromHexString("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff");

View File

@@ -0,0 +1,47 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
public enum RequestType {
ACCOUNT_RANGE((byte) 1),
STORAGE_RANGE((byte) 2),
BYTECODES((byte) 3),
TRIE_NODE((byte) 4);
private final byte value;
RequestType(final byte value) {
this.value = value;
}
public byte getValue() {
return value;
}
public static RequestType fromValue(final byte value) {
switch (value) {
case (byte) 1:
return ACCOUNT_RANGE;
case (byte) 2:
return STORAGE_RANGE;
case (byte) 3:
return BYTECODES;
case (byte) 4:
return TRIE_NODE;
default:
throw new IllegalArgumentException("Invalid value supplied");
}
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import org.hyperledger.besu.ethereum.core.SealableBlockHeader;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
public class SnapSyncState extends FastSyncState {
private boolean isHealInProgress;
public SnapSyncState(final FastSyncState fastSyncState) {
super(fastSyncState.getPivotBlockNumber(), fastSyncState.getPivotBlockHeader());
}
public boolean isHealInProgress() {
return isHealInProgress;
}
public void setHealStatus(final boolean healStatus) {
isHealInProgress = healStatus;
}
public boolean isExpired(final SnapDataRequest request) {
return getPivotBlockHeader()
.map(SealableBlockHeader::getStateRoot)
.filter(hash -> hash.equals(request.getRootHash()))
.isEmpty();
}
}

View File

@@ -0,0 +1,262 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest.createAccountTrieNodeDataRequest;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.StorageRangeDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.RunnableCounter;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.tasks.InMemoryTaskQueue;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import org.hyperledger.besu.services.tasks.Task;
import org.hyperledger.besu.services.tasks.TaskCollection;
import java.time.Clock;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest> {
private static final Logger LOG = LoggerFactory.getLogger(SnapWorldDownloadState.class);
private static final int DISPLAY_PROGRESS_STEP = 100000;
protected final InMemoryTaskQueue<SnapDataRequest> pendingAccountRequests =
new InMemoryTaskQueue<>();
protected final InMemoryTaskQueue<SnapDataRequest> pendingStorageRequests =
new InMemoryTaskQueue<>();
protected final InMemoryTaskQueue<SnapDataRequest> pendingBigStorageRequests =
new InMemoryTaskQueue<>();
protected final InMemoryTaskQueue<SnapDataRequest> pendingCodeRequests =
new InMemoryTaskQueue<>();
protected final InMemoryTasksPriorityQueues<SnapDataRequest> pendingTrieNodeRequests =
new InMemoryTasksPriorityQueues<>();
public final HashSet<Bytes> inconsistentAccounts = new HashSet<>();
private final SnapSyncState snapSyncState;
// metrics round the snapsync
private final RunnableCounter generatedNodes;
private final RunnableCounter healedNodes;
public SnapWorldDownloadState(
final WorldStateStorage worldStateStorage,
final SnapSyncState snapSyncState,
final InMemoryTasksPriorityQueues<SnapDataRequest> pendingRequests,
final int maxRequestsWithoutProgress,
final long minMillisBeforeStalling,
final MetricsSystem metricsSystem,
final Clock clock) {
super(
worldStateStorage,
pendingRequests,
maxRequestsWithoutProgress,
minMillisBeforeStalling,
clock);
this.snapSyncState = snapSyncState;
this.generatedNodes =
new RunnableCounter(
metricsSystem.createCounter(
BesuMetricCategory.SYNCHRONIZER,
"snapsync_world_state_generated_nodes_total",
"Total number of data nodes generated as part of snap sync world state download"),
this::displayWorldStateSyncProgress,
DISPLAY_PROGRESS_STEP);
this.healedNodes =
new RunnableCounter(
metricsSystem.createCounter(
BesuMetricCategory.SYNCHRONIZER,
"snapsync_world_state_healed_nodes_total",
"Total number of data nodes healed as part of snap sync world state heal process"),
this::displayHealProgress,
DISPLAY_PROGRESS_STEP);
}
@Override
public synchronized void notifyTaskAvailable() {
notifyAll();
}
@Override
protected synchronized void markAsStalled(final int maxNodeRequestRetries) {
// TODO retry when mark as stalled
}
@Override
public synchronized boolean checkCompletion(final BlockHeader header) {
if (!internalFuture.isDone()
&& pendingAccountRequests.allTasksCompleted()
&& pendingCodeRequests.allTasksCompleted()
&& pendingStorageRequests.allTasksCompleted()
&& pendingBigStorageRequests.allTasksCompleted()
&& pendingTrieNodeRequests.allTasksCompleted()) {
if (!snapSyncState.isHealInProgress()) {
snapSyncState.setHealStatus(true);
enqueueRequest(
createAccountTrieNodeDataRequest(
snapSyncState.getPivotBlockHeader().orElseThrow().getStateRoot(),
Bytes.EMPTY,
inconsistentAccounts));
} else {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
updater.commit();
LOG.info(
"Finished downloading world state from peers (generated nodes {} / healed nodes {})",
generatedNodes.get(),
healedNodes.get());
internalFuture.complete(null);
return true;
}
}
return false;
}
@Override
protected synchronized void cleanupQueues() {
super.cleanupQueues();
pendingAccountRequests.clear();
pendingStorageRequests.clear();
pendingBigStorageRequests.clear();
pendingCodeRequests.clear();
pendingTrieNodeRequests.clear();
}
@Override
public synchronized void enqueueRequest(final SnapDataRequest request) {
if (!internalFuture.isDone()) {
if (request instanceof BytecodeRequest) {
pendingCodeRequests.add(request);
} else if (request instanceof StorageRangeDataRequest) {
if (!((StorageRangeDataRequest) request).getStartKeyHash().equals(RangeManager.MIN_RANGE)) {
pendingBigStorageRequests.add(request);
} else {
pendingStorageRequests.add(request);
}
} else if (request instanceof AccountRangeDataRequest) {
pendingAccountRequests.add(request);
} else {
pendingTrieNodeRequests.add(request);
}
notifyAll();
}
}
public void addInconsistentAccount(final Bytes account) {
inconsistentAccounts.add(account);
}
@Override
public synchronized void enqueueRequests(final Stream<SnapDataRequest> requests) {
if (!internalFuture.isDone()) {
requests.forEach(this::enqueueRequest);
}
}
public synchronized Task<SnapDataRequest> dequeueRequestBlocking(
final List<TaskCollection<SnapDataRequest>> queueDependencies,
final List<TaskCollection<SnapDataRequest>> queues) {
while (!internalFuture.isDone()) {
while (queueDependencies.stream()
.map(TaskCollection::allTasksCompleted)
.anyMatch(Predicate.isEqual(false))) {
try {
wait();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
for (TaskCollection<SnapDataRequest> queue : queues) {
Task<SnapDataRequest> task = queue.remove();
if (task != null) {
return task;
}
}
try {
wait();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
return null;
}
public synchronized Task<SnapDataRequest> dequeueAccountRequestBlocking() {
return dequeueRequestBlocking(
List.of(pendingStorageRequests, pendingCodeRequests), List.of(pendingAccountRequests));
}
public synchronized Task<SnapDataRequest> dequeueBigStorageRequestBlocking() {
return dequeueRequestBlocking(Collections.emptyList(), List.of(pendingBigStorageRequests));
}
public synchronized Task<SnapDataRequest> dequeueStorageRequestBlocking() {
return dequeueRequestBlocking(Collections.emptyList(), List.of(pendingStorageRequests));
}
public synchronized Task<SnapDataRequest> dequeueCodeRequestBlocking() {
return dequeueRequestBlocking(List.of(pendingStorageRequests), List.of(pendingCodeRequests));
}
public synchronized Task<SnapDataRequest> dequeueTrieNodeRequestBlocking() {
return dequeueRequestBlocking(
List.of(pendingAccountRequests, pendingStorageRequests, pendingBigStorageRequests),
List.of(pendingTrieNodeRequests));
}
public void clearTrieNodes() {
worldStateStorage.clearFlatDatabase();
pendingTrieNodeRequests.clearInternalQueues();
pendingCodeRequests.clearInternalQueue();
snapSyncState.setHealStatus(false);
}
public RunnableCounter getGeneratedNodes() {
return generatedNodes;
}
public RunnableCounter getHealedNodes() {
return healedNodes;
}
private void displayWorldStateSyncProgress() {
LOG.info("Generated {} world state nodes", generatedNodes.get());
}
private void displayHealProgress() {
LOG.info("Healed {} world sync nodes", healedNodes.get());
}
}

View File

@@ -0,0 +1,111 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.trie.CommitVisitor;
import org.hyperledger.besu.ethereum.trie.InnerNodeDiscoveryManager;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.Node;
import org.hyperledger.besu.ethereum.trie.NodeUpdater;
import org.hyperledger.besu.ethereum.trie.SnapPutVisitor;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
public class StackTrie {
private final Bytes32 rootHash;
private final AtomicInteger nbSegments;
private final int maxSegments;
private final Bytes32 startKeyHash;
private final List<Bytes> proofs;
private final TreeMap<Bytes32, Bytes> keys;
public StackTrie(
final Hash rootHash,
final int nbSegments,
final int maxSegments,
final Bytes32 startKeyHash) {
this.rootHash = rootHash;
this.nbSegments = new AtomicInteger(nbSegments);
this.maxSegments = maxSegments;
this.startKeyHash = startKeyHash;
this.proofs = new ArrayList<>();
this.keys = new TreeMap<>();
}
public void addKeys(final TreeMap<Bytes32, Bytes> keys) {
this.keys.putAll(keys);
}
public void addProofs(final List<Bytes> proofs) {
this.proofs.addAll(proofs);
}
public void commit(final NodeUpdater nodeUpdater) {
if (nbSegments.decrementAndGet() <= 0 && (!proofs.isEmpty() || !keys.isEmpty())) {
final Map<Bytes32, Bytes> proofsEntries = new HashMap<>();
for (Bytes proof : proofs) {
proofsEntries.put(Hash.hash(proof), proof);
}
final InnerNodeDiscoveryManager<Bytes> snapStoredNodeFactory =
new InnerNodeDiscoveryManager<>(
(location, hash) -> Optional.ofNullable(proofsEntries.get(hash)),
Function.identity(),
Function.identity(),
startKeyHash,
keys.lastKey(),
true);
final MerklePatriciaTrie<Bytes, Bytes> trie =
new StoredMerklePatriciaTrie<>(snapStoredNodeFactory, rootHash);
for (Map.Entry<Bytes32, Bytes> account : keys.entrySet()) {
trie.put(account.getKey(), new SnapPutVisitor<>(snapStoredNodeFactory, account.getValue()));
}
trie.commit(
nodeUpdater,
(new CommitVisitor<>(nodeUpdater) {
@Override
public void maybeStoreNode(final Bytes location, final Node<Bytes> node) {
if (!node.isHealNeeded()) {
super.maybeStoreNode(location, node);
}
}
}));
}
}
public boolean addSegment() {
if (nbSegments.get() > maxSegments) {
return false;
} else {
nbSegments.incrementAndGet();
return true;
}
}
}

View File

@@ -0,0 +1,248 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync.request;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.RangeManager.MAX_RANGE;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.RangeManager.MIN_RANGE;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.RangeManager.findNewBeginElementInRange;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.RequestType.ACCOUNT_RANGE;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.trie.CommitVisitor;
import org.hyperledger.besu.ethereum.trie.InnerNodeDiscoveryManager;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.Node;
import org.hyperledger.besu.ethereum.trie.NodeUpdater;
import org.hyperledger.besu.ethereum.trie.SnapPutVisitor;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;
import kotlin.collections.ArrayDeque;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Returns a list of accounts and the merkle proofs of an entire range */
public class AccountRangeDataRequest extends SnapDataRequest {
private static final Logger LOG = LoggerFactory.getLogger(AccountRangeDataRequest.class);
protected final Bytes32 startKeyHash;
protected final Bytes32 endKeyHash;
protected TreeMap<Bytes32, Bytes> accounts;
protected ArrayDeque<Bytes> proofs;
private final Optional<Bytes32> startStorageRange;
private final Optional<Bytes32> endStorageRange;
protected AccountRangeDataRequest(
final Hash rootHash,
final Bytes32 startKeyHash,
final Bytes32 endKeyHash,
final Optional<Bytes32> startStorageRange,
final Optional<Bytes32> endStorageRange) {
super(ACCOUNT_RANGE, rootHash);
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
this.accounts = new TreeMap<>();
this.proofs = new ArrayDeque<>();
this.startStorageRange = startStorageRange;
this.endStorageRange = endStorageRange;
LOG.trace(
"create get account range data request with root hash={} from {} to {}",
rootHash,
startKeyHash,
endKeyHash);
}
protected AccountRangeDataRequest(
final Hash rootHash, final Bytes32 startKeyHash, final Bytes32 endKeyHash) {
this(rootHash, startKeyHash, endKeyHash, Optional.empty(), Optional.empty());
}
protected AccountRangeDataRequest(
final Hash rootHash,
final Hash accountHash,
final Bytes32 startStorageRange,
final Bytes32 endStorageRange) {
this(
rootHash,
accountHash,
accountHash,
Optional.of(startStorageRange),
Optional.of(endStorageRange));
}
@Override
protected int doPersist(
final WorldStateStorage worldStateStorage,
final Updater updater,
final WorldDownloadState<SnapDataRequest> downloadState,
final SnapSyncState snapSyncState) {
if (startStorageRange.isPresent() && endStorageRange.isPresent()) {
// not store the new account if we just want to complete the account thanks to another
// rootHash
return 0;
}
final Bytes32 storageRoot =
proofs.isEmpty() ? MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH : getRootHash();
final Map<Bytes32, Bytes> proofsEntries = new HashMap<>();
for (Bytes proof : proofs) {
proofsEntries.put(Hash.hash(proof), proof);
}
final InnerNodeDiscoveryManager<Bytes> snapStoredNodeFactory =
new InnerNodeDiscoveryManager<>(
(location, hash) -> Optional.ofNullable(proofsEntries.get(hash)),
Function.identity(),
Function.identity(),
startKeyHash,
accounts.lastKey(),
true);
final MerklePatriciaTrie<Bytes, Bytes> trie =
new StoredMerklePatriciaTrie<>(snapStoredNodeFactory, storageRoot);
for (Map.Entry<Bytes32, Bytes> account : accounts.entrySet()) {
trie.put(account.getKey(), new SnapPutVisitor<>(snapStoredNodeFactory, account.getValue()));
}
// search incomplete nodes in the range
final AtomicInteger nbNodesSaved = new AtomicInteger();
final NodeUpdater nodeUpdater =
(location, hash, value) -> {
updater.putAccountStateTrieNode(location, hash, value);
nbNodesSaved.getAndIncrement();
};
trie.commit(
nodeUpdater,
(new CommitVisitor<>(nodeUpdater) {
@Override
public void maybeStoreNode(final Bytes location, final Node<Bytes> node) {
if (!node.isHealNeeded()) {
super.maybeStoreNode(location, node);
}
}
}));
return nbNodesSaved.get();
}
@Override
public boolean checkProof(
final WorldDownloadState<SnapDataRequest> downloadState,
final WorldStateProofProvider worldStateProofProvider,
final SnapSyncState snapSyncState) {
// validate the range proof
if (!worldStateProofProvider.isValidRangeProof(
startKeyHash, endKeyHash, getRootHash(), proofs, accounts)) {
clear();
return false;
}
return true;
}
@Override
public boolean isValid() {
return !accounts.isEmpty() || !proofs.isEmpty();
}
@Override
public Stream<SnapDataRequest> getChildRequests(
final SnapWorldDownloadState downloadState,
final WorldStateStorage worldStateStorage,
final SnapSyncState snapSyncState) {
final List<SnapDataRequest> childRequests = new ArrayList<>();
// new request is added if the response does not match all the requested range
findNewBeginElementInRange(getRootHash(), proofs, accounts, endKeyHash)
.ifPresent(
missingRightElement ->
childRequests.add(
createAccountRangeDataRequest(getRootHash(), missingRightElement, endKeyHash)));
// find missing storages and code
for (Map.Entry<Bytes32, Bytes> account : accounts.entrySet()) {
final StateTrieAccountValue accountValue =
StateTrieAccountValue.readFrom(RLP.input(account.getValue()));
if (!accountValue.getStorageRoot().equals(Hash.EMPTY_TRIE_HASH)) {
childRequests.add(
createStorageRangeDataRequest(
account.getKey(),
accountValue.getStorageRoot(),
startStorageRange.orElse(MIN_RANGE),
endStorageRange.orElse(MAX_RANGE)));
}
if (!accountValue.getCodeHash().equals(Hash.EMPTY)) {
childRequests.add(createBytecodeRequest(account.getKey(), accountValue.getCodeHash()));
}
}
return childRequests.stream();
}
public Bytes32 getStartKeyHash() {
return startKeyHash;
}
public Bytes32 getEndKeyHash() {
return endKeyHash;
}
public TreeMap<Bytes32, Bytes> getAccounts() {
return accounts;
}
public void setAccounts(final TreeMap<Bytes32, Bytes> accounts) {
this.accounts = accounts;
}
public ArrayDeque<Bytes> getProofs() {
return proofs;
}
public void setProofs(final ArrayDeque<Bytes> proofs) {
this.proofs = proofs;
}
@Override
public void clear() {
accounts = new TreeMap<>();
proofs = new ArrayDeque<>();
}
}

View File

@@ -0,0 +1,160 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync.request;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.trie.CompactEncoding;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
public class AccountTrieNodeDataRequest extends TrieNodeDataRequest {
private final HashSet<Bytes> inconsistentAccounts;
AccountTrieNodeDataRequest(
final Hash hash,
final Hash originalRootHash,
final Bytes location,
final HashSet<Bytes> inconsistentAccounts) {
super(hash, originalRootHash, location);
this.inconsistentAccounts = inconsistentAccounts;
}
@Override
protected int doPersist(
final WorldStateStorage worldStateStorage,
final WorldStateStorage.Updater updater,
final WorldDownloadState<SnapDataRequest> downloadState,
final SnapSyncState snapSyncState) {
if (isRoot()) {
downloadState.setRootNodeData(data);
}
updater.putAccountStateTrieNode(getLocation(), getNodeHash(), data);
return 1;
}
@Override
public Optional<Bytes> getExistingData(final WorldStateStorage worldStateStorage) {
return worldStateStorage
.getAccountStateTrieNode(getLocation(), getNodeHash())
.filter(data -> !getLocation().isEmpty())
.filter(data -> Hash.hash(data).equals(getNodeHash()));
}
@Override
protected SnapDataRequest createChildNodeDataRequest(final Hash childHash, final Bytes location) {
return createAccountTrieNodeDataRequest(
childHash, getRootHash(), location, getSubLocation(location));
}
private HashSet<Bytes> getSubLocation(final Bytes location) {
final HashSet<Bytes> foundAccountsToHeal = new HashSet<>();
for (Bytes account : inconsistentAccounts) {
if (account.commonPrefixLength(location) == location.size()) {
foundAccountsToHeal.add(account);
}
}
return foundAccountsToHeal;
}
@Override
public Stream<SnapDataRequest> getRootStorageRequests(final WorldStateStorage worldStateStorage) {
final List<SnapDataRequest> requests = new ArrayList<>();
final StoredMerklePatriciaTrie<Bytes, Bytes> accountTrie =
new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStateTrieNode,
Hash.hash(data),
getLocation(),
Function.identity(),
Function.identity());
for (Bytes account : inconsistentAccounts) {
final Bytes32 accountHash = Bytes32.wrap(CompactEncoding.pathToBytes(account));
accountTrie
.getPath(
Bytes.wrap(
account.toArrayUnsafe(),
getLocation().size(),
account.size() - getLocation().size()))
.map(RLP::input)
.map(StateTrieAccountValue::readFrom)
.ifPresent(
stateTrieAccountValue -> {
// an account need a heal step
requests.add(
createStorageTrieNodeDataRequest(
stateTrieAccountValue.getStorageRoot(),
Hash.wrap(accountHash),
getRootHash(),
Bytes.EMPTY));
});
}
return requests.stream();
}
@Override
protected Stream<SnapDataRequest> getRequestsFromTrieNodeValue(
final WorldStateStorage worldStateStorage,
final Bytes location,
final Bytes path,
final Bytes value) {
final Stream.Builder<SnapDataRequest> builder = Stream.builder();
final StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(RLP.input(value));
// Retrieve account hash
final Hash accountHash =
Hash.wrap(
Bytes32.wrap(CompactEncoding.pathToBytes(Bytes.concatenate(getLocation(), path))));
if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) {
((BonsaiWorldStateKeyValueStorage.Updater) worldStateStorage.updater())
.putAccountInfoState(accountHash, value)
.commit();
}
// Add code, if appropriate
if (!accountValue.getCodeHash().equals(Hash.EMPTY)) {
builder.add(createBytecodeRequest(accountHash, accountValue.getCodeHash()));
}
// Add storage, if appropriate
if (!accountValue.getStorageRoot().equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
// If we detect an account storage we fill it with snapsync before completing with a heal
final SnapDataRequest storageTrieRequest =
createStorageTrieNodeDataRequest(
accountValue.getStorageRoot(), accountHash, getRootHash(), Bytes.EMPTY);
builder.add(storageTrieRequest);
}
return builder.build();
}
@Override
public List<Bytes> getTrieNodePath() {
return List.of(CompactEncoding.encode(getLocation()));
}
}

View File

@@ -0,0 +1,111 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync.request;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.RequestType.BYTECODES;
import static org.slf4j.LoggerFactory.getLogger;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
/** Returns a list of bytecodes */
public class BytecodeRequest extends SnapDataRequest {
private static final Logger LOG = getLogger(BytecodeRequest.class);
private final Bytes32 accountHash;
private final Bytes32 codeHash;
private Bytes code;
protected BytecodeRequest(
final Hash rootHash, final Bytes32 accountHash, final Bytes32 codeHash) {
super(BYTECODES, rootHash);
LOG.trace("create get bytecode data request for {} with root hash={}", accountHash, rootHash);
this.accountHash = accountHash;
this.codeHash = codeHash;
this.code = Bytes.EMPTY;
}
@Override
protected int doPersist(
final WorldStateStorage worldStateStorage,
final Updater updater,
final WorldDownloadState<SnapDataRequest> downloadState,
final SnapSyncState snapSyncState) {
updater.putCode(Hash.wrap(accountHash), code);
return possibleParent
.map(
trieNodeDataRequest ->
trieNodeDataRequest.saveParent(
worldStateStorage, updater, downloadState, snapSyncState)
+ 1)
.orElse(1);
}
@Override
public boolean checkProof(
final WorldDownloadState<SnapDataRequest> downloadState,
final WorldStateProofProvider worldStateProofProvider,
final SnapSyncState snapSyncState) {
return true;
}
@Override
public boolean isValid() {
return !code.isEmpty();
}
@Override
public Stream<SnapDataRequest> getChildRequests(
final SnapWorldDownloadState downloadState,
final WorldStateStorage worldStateStorage,
final SnapSyncState snapSyncState) {
return Stream.empty();
}
public Bytes32 getAccountHash() {
return accountHash;
}
public Bytes32 getCodeHash() {
return codeHash;
}
public void setCode(final Bytes code) {
this.code = code;
}
@Override
public long getPriority() {
return 0;
}
@Override
public int getDepth() {
return 0;
}
}

View File

@@ -0,0 +1,175 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync.request;
import static org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.NodeDataRequest.MAX_CHILDREN;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.RequestType;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloaderException;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.services.tasks.TasksPriorityProvider;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
public abstract class SnapDataRequest implements TasksPriorityProvider {
public static final int MAX_CHILD = 16;
protected Optional<TrieNodeDataRequest> possibleParent = Optional.empty();
protected int depth;
protected long priority;
protected final AtomicInteger pendingChildren = new AtomicInteger(0);
private final RequestType requestType;
private Hash rootHash;
protected SnapDataRequest(final RequestType requestType, final Hash originalRootHash) {
this.requestType = requestType;
this.rootHash = originalRootHash;
}
public static AccountRangeDataRequest createAccountRangeDataRequest(
final Hash rootHash, final Bytes32 startKeyHash, final Bytes32 endKeyHash) {
return new AccountRangeDataRequest(rootHash, startKeyHash, endKeyHash);
}
public static AccountRangeDataRequest createAccountDataRequest(
final Hash rootHash,
final Hash accountHash,
final Bytes32 startStorageRange,
final Bytes32 endStorageRange) {
return new AccountRangeDataRequest(rootHash, accountHash, startStorageRange, endStorageRange);
}
public StorageRangeDataRequest createStorageRangeDataRequest(
final Bytes32 accountHash,
final Bytes32 storageRoot,
final Bytes32 startKeyHash,
final Bytes32 endKeyHash) {
return new StorageRangeDataRequest(
getRootHash(), accountHash, storageRoot, startKeyHash, endKeyHash);
}
public static AccountTrieNodeDataRequest createAccountTrieNodeDataRequest(
final Hash hash, final Bytes location, final HashSet<Bytes> accountHeals) {
return new AccountTrieNodeDataRequest(hash, hash, location, accountHeals);
}
public static AccountTrieNodeDataRequest createAccountTrieNodeDataRequest(
final Hash hash,
final Hash rootHash,
final Bytes location,
final HashSet<Bytes> accountHeals) {
return new AccountTrieNodeDataRequest(hash, rootHash, location, accountHeals);
}
public static StorageTrieNodeDataRequest createStorageTrieNodeDataRequest(
final Hash hash, final Hash accountHash, final Hash rootHash, final Bytes location) {
return new StorageTrieNodeDataRequest(hash, accountHash, rootHash, location);
}
public BytecodeRequest createBytecodeRequest(final Bytes32 accountHash, final Bytes32 codeHash) {
return new BytecodeRequest(getRootHash(), accountHash, codeHash);
}
public int persist(
final WorldStateStorage worldStateStorage,
final WorldStateStorage.Updater updater,
final WorldDownloadState<SnapDataRequest> downloadState,
final SnapSyncState snapSyncState) {
return doPersist(worldStateStorage, updater, downloadState, snapSyncState);
}
protected abstract int doPersist(
final WorldStateStorage worldStateStorage,
final WorldStateStorage.Updater updater,
final WorldDownloadState<SnapDataRequest> downloadState,
final SnapSyncState snapSyncState);
public abstract boolean checkProof(
final WorldDownloadState<SnapDataRequest> downloadState,
final WorldStateProofProvider worldStateProofProvider,
SnapSyncState snapSyncState);
public abstract boolean isValid();
public boolean isExpired(final SnapSyncState snapSyncState) {
return false;
}
public abstract Stream<SnapDataRequest> getChildRequests(
final SnapWorldDownloadState downloadState,
final WorldStateStorage worldStateStorage,
final SnapSyncState snapSyncState);
protected void registerParent(final TrieNodeDataRequest parent) {
if (this.possibleParent.isPresent()) {
throw new WorldStateDownloaderException("Cannot set parent twice");
}
this.possibleParent = Optional.of(parent);
this.depth = parent.depth + 1;
this.priority = parent.priority * MAX_CHILDREN + parent.incrementChildren();
}
protected int incrementChildren() {
return pendingChildren.incrementAndGet();
}
protected int saveParent(
final WorldStateStorage worldStateStorage,
final WorldStateStorage.Updater updater,
final WorldDownloadState<SnapDataRequest> downloadState,
final SnapSyncState snapSyncState) {
if (pendingChildren.decrementAndGet() == 0) {
return persist(worldStateStorage, updater, downloadState, snapSyncState);
}
return 0;
}
public void clear() {}
public RequestType getRequestType() {
return requestType;
}
public Hash getRootHash() {
return rootHash;
}
public void setRootHash(final Hash rootHash) {
this.rootHash = rootHash;
}
@Override
public long getPriority() {
return 0;
}
@Override
public int getDepth() {
return 0;
}
}

View File

@@ -0,0 +1,226 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync.request;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.RangeManager.MAX_RANGE;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.RangeManager.MIN_RANGE;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.RangeManager.findNewBeginElementInRange;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.RequestType.STORAGE_RANGE;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.RangeManager;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.StackTrie;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.trie.CompactEncoding;
import org.hyperledger.besu.ethereum.trie.NodeUpdater;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import kotlin.collections.ArrayDeque;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Returns a list of storages and the merkle proofs of an entire range */
public class StorageRangeDataRequest extends SnapDataRequest {
private static final Logger LOG = LoggerFactory.getLogger(StorageRangeDataRequest.class);
private final Bytes32 accountHash;
private final Bytes32 storageRoot;
private final Bytes32 startKeyHash;
private final Bytes32 endKeyHash;
private StackTrie stackTrie;
private TreeMap<Bytes32, Bytes> slots;
private ArrayDeque<Bytes> proofs;
private boolean isProofValid;
protected StorageRangeDataRequest(
final Hash rootHash,
final Bytes32 accountHash,
final Bytes32 storageRoot,
final Bytes32 startKeyHash,
final Bytes32 endKeyHash) {
super(STORAGE_RANGE, rootHash);
this.accountHash = accountHash;
this.storageRoot = storageRoot;
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
this.proofs = new ArrayDeque<>();
this.slots = new TreeMap<>();
addStackTrie(Optional.empty());
LOG.trace(
"create get storage range data request for account {} with root hash={} from {} to {}",
accountHash,
rootHash,
startKeyHash,
endKeyHash);
}
@Override
protected int doPersist(
final WorldStateStorage worldStateStorage,
final Updater updater,
final WorldDownloadState<SnapDataRequest> downloadState,
final SnapSyncState snapSyncState) {
if (isProofValid) {
stackTrie.addKeys(slots);
stackTrie.addProofs(proofs);
}
// search incomplete nodes in the range
final AtomicInteger nbNodesSaved = new AtomicInteger();
final AtomicReference<Updater> updaterTmp = new AtomicReference<>(worldStateStorage.updater());
final NodeUpdater nodeUpdater =
(location, hash, value) -> {
// create small batch in order to commit small amount of nodes at the same time
updaterTmp.get().putAccountStorageTrieNode(Hash.wrap(accountHash), location, hash, value);
if (nbNodesSaved.getAndIncrement() % 1000 == 0) {
updaterTmp.get().commit();
updaterTmp.set(worldStateStorage.updater());
}
};
stackTrie.commit(nodeUpdater);
updaterTmp.get().commit();
return nbNodesSaved.get();
}
@Override
public boolean checkProof(
final WorldDownloadState<SnapDataRequest> downloadState,
final WorldStateProofProvider worldStateProofProvider,
final SnapSyncState snapSyncState) {
if (!worldStateProofProvider.isValidRangeProof(
startKeyHash, endKeyHash, storageRoot, proofs, slots)) {
downloadState.enqueueRequest(
createAccountDataRequest(
getRootHash(), Hash.wrap(accountHash), startKeyHash, endKeyHash));
isProofValid = false;
} else {
isProofValid = true;
}
return isProofValid;
}
@Override
public boolean isValid() {
return !slots.isEmpty() || !proofs.isEmpty();
}
@Override
public boolean isExpired(final SnapSyncState snapSyncState) {
return snapSyncState.isExpired(this);
}
@Override
public Stream<SnapDataRequest> getChildRequests(
final SnapWorldDownloadState downloadState,
final WorldStateStorage worldStateStorage,
final SnapSyncState snapSyncState) {
final List<SnapDataRequest> childRequests = new ArrayList<>();
if (!isProofValid) {
return Stream.empty();
}
findNewBeginElementInRange(storageRoot, proofs, slots, endKeyHash)
.ifPresent(
missingRightElement -> {
final int nbRanges = findNbRanges();
RangeManager.generateRanges(missingRightElement, endKeyHash, nbRanges)
.forEach(
(key, value) -> {
final StorageRangeDataRequest storageRangeDataRequest =
createStorageRangeDataRequest(accountHash, storageRoot, key, value);
storageRangeDataRequest.addStackTrie(Optional.of(stackTrie));
childRequests.add(storageRangeDataRequest);
});
if (!snapSyncState.isHealInProgress()
&& startKeyHash.equals(MIN_RANGE)
&& endKeyHash.equals(MAX_RANGE)) {
// need to heal this account storage
downloadState.addInconsistentAccount(CompactEncoding.bytesToPath(accountHash));
}
});
return childRequests.stream();
}
private int findNbRanges() {
if (startKeyHash.equals(MIN_RANGE) && endKeyHash.equals(MAX_RANGE)) {
final int nbRangesNeeded =
MAX_RANGE
.toUnsignedBigInteger()
.divide(
slots
.lastKey()
.toUnsignedBigInteger()
.subtract(startKeyHash.toUnsignedBigInteger()))
.intValue();
if (nbRangesNeeded >= MAX_CHILD) {
return MAX_CHILD;
}
return nbRangesNeeded;
}
return 1;
}
public Bytes32 getAccountHash() {
return accountHash;
}
public Bytes32 getStorageRoot() {
return storageRoot;
}
public Bytes32 getStartKeyHash() {
return startKeyHash;
}
public Bytes32 getEndKeyHash() {
return endKeyHash;
}
public void setProofs(final ArrayDeque<Bytes> proofs) {
this.proofs = proofs;
}
public void setSlots(final TreeMap<Bytes32, Bytes> slots) {
this.slots = slots;
}
public void addStackTrie(final Optional<StackTrie> maybeStackTrie) {
stackTrie =
maybeStackTrie
.filter(StackTrie::addSegment)
.orElse(new StackTrie(getRootHash(), 1, 3, startKeyHash));
}
}

View File

@@ -0,0 +1,92 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync.request;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.trie.CompactEncoding;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.rlp.RLP;
public class StorageTrieNodeDataRequest extends TrieNodeDataRequest {
final Hash accountHash;
StorageTrieNodeDataRequest(
final Hash nodeHash, final Hash accountHash, final Hash rootHash, final Bytes location) {
super(nodeHash, rootHash, location);
this.accountHash = accountHash;
}
@Override
protected int doPersist(
final WorldStateStorage worldStateStorage,
final Updater updater,
final WorldDownloadState<SnapDataRequest> downloadState,
final SnapSyncState snapSyncState) {
updater.putAccountStorageTrieNode(getAccountHash(), getLocation(), getNodeHash(), data);
return 1;
}
@Override
public Optional<Bytes> getExistingData(final WorldStateStorage worldStateStorage) {
return worldStateStorage
.getAccountStorageTrieNode(getAccountHash(), getLocation(), getNodeHash())
.filter(data -> Hash.hash(data).equals(getNodeHash()));
}
@Override
protected SnapDataRequest createChildNodeDataRequest(final Hash childHash, final Bytes location) {
return createStorageTrieNodeDataRequest(childHash, getAccountHash(), getRootHash(), location);
}
@Override
protected Stream<SnapDataRequest> getRequestsFromTrieNodeValue(
final WorldStateStorage worldStateStorage,
final Bytes location,
final Bytes path,
final Bytes value) {
if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) {
((BonsaiWorldStateKeyValueStorage.Updater) worldStateStorage.updater())
.putStorageValueBySlotHash(
accountHash, getSlotHash(location, path), Bytes32.leftPad(RLP.decodeValue(value)))
.commit();
}
return Stream.empty();
}
public Hash getAccountHash() {
return accountHash;
}
private Hash getSlotHash(final Bytes location, final Bytes path) {
return Hash.wrap(Bytes32.wrap(CompactEncoding.pathToBytes(Bytes.concatenate(location, path))));
}
@Override
public List<Bytes> getTrieNodePath() {
return List.of(accountHash, CompactEncoding.encode(getLocation()));
}
}

View File

@@ -0,0 +1,196 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync.request;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.RequestType.TRIE_NODE;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldDownloadState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.trie.Node;
import org.hyperledger.besu.ethereum.trie.TrieNodeDecoder;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.services.tasks.TasksPriorityProvider;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
public abstract class TrieNodeDataRequest extends SnapDataRequest implements TasksPriorityProvider {
private final Bytes32 nodeHash;
private final Bytes location;
protected Bytes data;
protected boolean isInCache = false;
protected boolean requiresPersisting = true;
protected TrieNodeDataRequest(final Hash nodeHash, final Hash rootHash, final Bytes location) {
super(TRIE_NODE, rootHash);
this.nodeHash = nodeHash;
this.location = location;
this.data = Bytes.EMPTY;
}
@Override
public int persist(
final WorldStateStorage worldStateStorage,
final WorldStateStorage.Updater updater,
final WorldDownloadState<SnapDataRequest> downloadState,
final SnapSyncState snapSyncState) {
if (!isValid() || isExpired(snapSyncState) || pendingChildren.get() > 0) {
// we do nothing. Our last child will eventually persist us.
return 0;
}
int saved = 0;
if (requiresPersisting) {
checkNotNull(data, "Must set data before node can be persisted.");
saved = doPersist(worldStateStorage, updater, downloadState, snapSyncState);
}
if (possibleParent.isPresent()) {
return possibleParent
.get()
.saveParent(worldStateStorage, updater, downloadState, snapSyncState)
+ saved;
}
return saved;
}
@Override
public Stream<SnapDataRequest> getChildRequests(
final SnapWorldDownloadState downloadState,
final WorldStateStorage worldStateStorage,
final SnapSyncState snapSyncState) {
if (!isValid()) {
// If this node hasn't been downloaded yet, we can't return any child data
return Stream.empty();
}
final List<Node<Bytes>> nodes = TrieNodeDecoder.decodeNodes(location, data);
return nodes.stream()
.flatMap(
node -> {
if (nodeIsHashReferencedDescendant(node)) {
return Stream.of(
createChildNodeDataRequest(
Hash.wrap(node.getHash()), node.getLocation().orElse(Bytes.EMPTY)));
} else {
return node.getValue()
.map(
value ->
getRequestsFromTrieNodeValue(
worldStateStorage,
node.getLocation().orElse(Bytes.EMPTY),
node.getPath(),
value))
.orElseGet(Stream::empty);
}
})
.peek(request -> request.registerParent(this));
}
@Override
public boolean checkProof(
final WorldDownloadState<SnapDataRequest> downloadState,
final WorldStateProofProvider worldStateProofProvider,
final SnapSyncState snapSyncState) {
return true;
}
public boolean isRoot() {
return possibleParent.isEmpty();
}
@Override
public boolean isValid() {
return !data.isEmpty() && Hash.hash(data).equals(getNodeHash());
}
@Override
public boolean isExpired(final SnapSyncState snapSyncState) {
return snapSyncState.isExpired(this);
}
public boolean isInCache() {
return isInCache;
}
public boolean isRequiresPersisting() {
return requiresPersisting;
}
public void setInCache(final boolean inCache) {
isInCache = inCache;
}
public Bytes32 getNodeHash() {
return nodeHash;
}
public Bytes getLocation() {
return location;
}
@Override
public int getDepth() {
return depth;
}
@Override
public long getPriority() {
return priority;
}
public Bytes getPathId() {
return Bytes.concatenate(new ArrayList<>(getTrieNodePath()));
}
public void setData(final Bytes data) {
this.data = data;
}
public void setRequiresPersisting(final boolean requiresPersisting) {
this.requiresPersisting = requiresPersisting;
}
private boolean nodeIsHashReferencedDescendant(final Node<Bytes> node) {
return !Objects.equals(node.getHash(), nodeHash) && node.isReferencedByHash();
}
public abstract Optional<Bytes> getExistingData(final WorldStateStorage worldStateStorage);
public abstract List<Bytes> getTrieNodePath();
protected abstract SnapDataRequest createChildNodeDataRequest(
final Hash childHash, final Bytes location);
public Stream<SnapDataRequest> getRootStorageRequests(final WorldStateStorage worldStateStorage) {
return Stream.empty();
}
protected abstract Stream<SnapDataRequest> getRequestsFromTrieNodeValue(
final WorldStateStorage worldStateStorage,
final Bytes location,
final Bytes path,
final Bytes value);
}

View File

@@ -52,13 +52,17 @@ public abstract class WorldDownloadState<REQUEST extends TasksPriorityProvider>
private final long minMillisBeforeStalling;
private volatile long timestampOfLastProgress;
protected Bytes rootNodeData;
protected final WorldStateStorage worldStateStorage;
protected WorldStateDownloadProcess worldStateDownloadProcess;
public WorldDownloadState(
final WorldStateStorage worldStateStorage,
final InMemoryTasksPriorityQueues<REQUEST> pendingRequests,
final int maxRequestsWithoutProgress,
final long minMillisBeforeStalling,
final Clock clock) {
this.worldStateStorage = worldStateStorage;
this.minMillisBeforeStalling = minMillisBeforeStalling;
this.timestampOfLastProgress = clock.millis();
this.downloadWasResumed = !pendingRequests.isEmpty();
@@ -104,10 +108,8 @@ public abstract class WorldDownloadState<REQUEST extends TasksPriorityProvider>
LOG.info("World state download failed. ", error);
}
}
for (final EthTask<?> outstandingRequest : outstandingRequests) {
outstandingRequest.cancel();
}
pendingRequests.clear();
cleanupQueues();
if (error != null) {
if (worldStateDownloadProcess != null) {
@@ -119,6 +121,13 @@ public abstract class WorldDownloadState<REQUEST extends TasksPriorityProvider>
}
}
protected synchronized void cleanupQueues() {
for (final EthTask<?> outstandingRequest : outstandingRequests) {
outstandingRequest.cancel();
}
pendingRequests.clear();
}
public boolean downloadWasResumed() {
return downloadWasResumed;
}
@@ -237,6 +246,5 @@ public abstract class WorldDownloadState<REQUEST extends TasksPriorityProvider>
this.worldStateDownloadProcess = worldStateDownloadProcess;
}
public abstract boolean checkCompletion(
final WorldStateStorage worldStateStorage, final BlockHeader header);
public abstract boolean checkCompletion(final BlockHeader header);
}

View File

@@ -23,7 +23,6 @@ import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.StubTask;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Optional;
@@ -34,13 +33,12 @@ import org.junit.Test;
public class CompleteTaskStepTest {
private static final Hash ROOT_HASH = Hash.hash(Bytes.of(1, 2, 3));
private final WorldStateStorage worldStateStorage = mock(WorldStateStorage.class);
private final FastWorldDownloadState downloadState = mock(FastWorldDownloadState.class);
private final BlockHeader blockHeader =
new BlockHeaderTestFixture().stateRoot(ROOT_HASH).buildHeader();
private final CompleteTaskStep completeTaskStep =
new CompleteTaskStep(worldStateStorage, new NoOpMetricsSystem(), () -> 3);
new CompleteTaskStep(new NoOpMetricsSystem(), () -> 3);
@Test
public void shouldMarkTaskAsFailedIfItDoesNotHaveData() {
@@ -52,7 +50,7 @@ public class CompleteTaskStepTest {
assertThat(task.isCompleted()).isFalse();
assertThat(task.isFailed()).isTrue();
verify(downloadState).notifyTaskAvailable();
verify(downloadState, never()).checkCompletion(worldStateStorage, blockHeader);
verify(downloadState, never()).checkCompletion(blockHeader);
}
@Test
@@ -64,7 +62,7 @@ public class CompleteTaskStepTest {
assertThat(task.isCompleted()).isTrue();
assertThat(task.isFailed()).isFalse();
verify(downloadState).checkCompletion(worldStateStorage, blockHeader);
verify(downloadState).checkCompletion(blockHeader);
}
private StubTask validTask() {

View File

@@ -65,11 +65,9 @@ public class FastWorldDownloadStateTest {
mock(WorldStateDownloadProcess.class);
private final TestClock clock = new TestClock();
private final FastWorldDownloadState downloadState =
new FastWorldDownloadState(
pendingRequests, MAX_REQUESTS_WITHOUT_PROGRESS, MIN_MILLIS_BEFORE_STALLING, clock);
private FastWorldDownloadState downloadState;
private final CompletableFuture<Void> future = downloadState.getDownloadFuture();
private CompletableFuture<Void> future;
@Parameterized.Parameters
public static Collection<Object[]> data() {
@@ -84,19 +82,27 @@ public class FastWorldDownloadStateTest {
@Before
public void setUp() {
downloadState.setRootNodeData(ROOT_NODE_DATA);
assertThat(downloadState.isDownloading()).isTrue();
if (storageFormat == DataStorageFormat.BONSAI) {
worldStateStorage =
new BonsaiWorldStateKeyValueStorage(new InMemoryKeyValueStorageProvider());
} else {
worldStateStorage = new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
}
downloadState =
new FastWorldDownloadState(
worldStateStorage,
pendingRequests,
MAX_REQUESTS_WITHOUT_PROGRESS,
MIN_MILLIS_BEFORE_STALLING,
clock);
assertThat(downloadState.isDownloading()).isTrue();
downloadState.setRootNodeData(ROOT_NODE_DATA);
future = downloadState.getDownloadFuture();
}
@Test
public void shouldCompleteReturnedFutureWhenNoPendingTasksRemain() {
downloadState.checkCompletion(worldStateStorage, header);
downloadState.checkCompletion(header);
assertThat(future).isCompleted();
assertThat(downloadState.isDownloading()).isFalse();
@@ -110,7 +116,7 @@ public class FastWorldDownloadStateTest {
assertThat(worldStateStorage.getAccountStateTrieNode(Bytes.EMPTY, ROOT_NODE_HASH))
.contains(ROOT_NODE_DATA));
downloadState.checkCompletion(worldStateStorage, header);
downloadState.checkCompletion(header);
assertThat(future).isCompleted();
assertThat(postFutureChecks).isCompleted();
@@ -121,7 +127,7 @@ public class FastWorldDownloadStateTest {
pendingRequests.add(
NodeDataRequest.createAccountDataRequest(Hash.EMPTY_TRIE_HASH, Optional.empty()));
downloadState.checkCompletion(worldStateStorage, header);
downloadState.checkCompletion(header);
assertThat(future).isNotDone();
assertThat(worldStateStorage.getAccountStateTrieNode(Bytes.EMPTY, ROOT_NODE_HASH)).isEmpty();
@@ -208,7 +214,7 @@ public class FastWorldDownloadStateTest {
@Test
public void shouldNotAddRequestsAfterDownloadIsCompleted() {
downloadState.checkCompletion(worldStateStorage, header);
downloadState.checkCompletion(header);
downloadState.enqueueRequests(
Stream.of(

View File

@@ -0,0 +1,225 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloadProcess;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import org.hyperledger.besu.testutil.TestClock;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class SnapWorldDownloadStateTest {
private static final Bytes ROOT_NODE_DATA = Bytes.of(1, 2, 3, 4);
private static final Hash ROOT_NODE_HASH = Hash.hash(ROOT_NODE_DATA);
private static final int MAX_REQUESTS_WITHOUT_PROGRESS = 10;
private static final long MIN_MILLIS_BEFORE_STALLING = 50_000;
private WorldStateStorage worldStateStorage;
private final BlockHeader header =
new BlockHeaderTestFixture().stateRoot(ROOT_NODE_HASH).buildHeader();
private final InMemoryTasksPriorityQueues<SnapDataRequest> pendingRequests =
new InMemoryTasksPriorityQueues<>();
private final WorldStateDownloadProcess worldStateDownloadProcess =
mock(WorldStateDownloadProcess.class);
private final SnapSyncState snapSyncState = mock(SnapSyncState.class);
private final MetricsSystem metricsSystem = mock(MetricsSystem.class);
private final TestClock clock = new TestClock();
private SnapWorldDownloadState downloadState;
private CompletableFuture<Void> future;
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {{DataStorageFormat.BONSAI}, {DataStorageFormat.FOREST}});
}
private final DataStorageFormat storageFormat;
public SnapWorldDownloadStateTest(final DataStorageFormat storageFormat) {
this.storageFormat = storageFormat;
}
@Before
public void setUp() {
if (storageFormat == DataStorageFormat.BONSAI) {
worldStateStorage =
new BonsaiWorldStateKeyValueStorage(new InMemoryKeyValueStorageProvider());
} else {
worldStateStorage = new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
}
downloadState =
new SnapWorldDownloadState(
worldStateStorage,
snapSyncState,
pendingRequests,
MAX_REQUESTS_WITHOUT_PROGRESS,
MIN_MILLIS_BEFORE_STALLING,
metricsSystem,
clock);
downloadState.setRootNodeData(ROOT_NODE_DATA);
future = downloadState.getDownloadFuture();
assertThat(downloadState.isDownloading()).isTrue();
}
@Test
public void shouldCompleteReturnedFutureWhenNoPendingTasksRemain() {
when(snapSyncState.isHealInProgress()).thenReturn(true);
downloadState.checkCompletion(header);
assertThat(future).isCompleted();
assertThat(downloadState.isDownloading()).isFalse();
}
@Test
public void shouldStartHealWhenNoSnapsyncPendingTasksRemain() {
when(snapSyncState.isHealInProgress()).thenReturn(false);
when(snapSyncState.getPivotBlockHeader()).thenReturn(Optional.of(mock(BlockHeader.class)));
assertThat(downloadState.pendingTrieNodeRequests.isEmpty()).isTrue();
downloadState.checkCompletion(header);
assertThat(downloadState.isDownloading()).isTrue();
assertThat(downloadState.pendingTrieNodeRequests.isEmpty()).isFalse();
}
@Test
public void shouldStoreRootNodeBeforeReturnedFutureCompletes() {
when(snapSyncState.isHealInProgress()).thenReturn(true);
final CompletableFuture<Void> postFutureChecks =
future.thenAccept(
result ->
assertThat(worldStateStorage.getAccountStateTrieNode(Bytes.EMPTY, ROOT_NODE_HASH))
.contains(ROOT_NODE_DATA));
downloadState.checkCompletion(header);
assertThat(future).isCompleted();
assertThat(postFutureChecks).isCompleted();
}
@Test
public void shouldNotCompleteWhenThereAreAccountPendingTasks() {
when(snapSyncState.isHealInProgress()).thenReturn(false);
downloadState.pendingAccountRequests.add(
SnapDataRequest.createAccountDataRequest(
Hash.EMPTY_TRIE_HASH,
Hash.wrap(Bytes32.random()),
RangeManager.MIN_RANGE,
RangeManager.MAX_RANGE));
downloadState.checkCompletion(header);
assertThat(future).isNotDone();
assertThat(worldStateStorage.getAccountStateTrieNode(Bytes.EMPTY, ROOT_NODE_HASH)).isEmpty();
assertThat(downloadState.isDownloading()).isTrue();
}
@Test
public void shouldNotCompleteWhenThereAreStoragePendingTasks() {
when(snapSyncState.isHealInProgress()).thenReturn(false);
downloadState.pendingStorageRequests.add(
SnapDataRequest.createStorageTrieNodeDataRequest(
Hash.EMPTY_TRIE_HASH, Hash.wrap(Bytes32.random()), Hash.EMPTY_TRIE_HASH, Bytes.EMPTY));
downloadState.checkCompletion(header);
assertThat(future).isNotDone();
assertThat(worldStateStorage.getAccountStateTrieNode(Bytes.EMPTY, ROOT_NODE_HASH)).isEmpty();
assertThat(downloadState.isDownloading()).isTrue();
downloadState.pendingBigStorageRequests.add(
SnapDataRequest.createStorageTrieNodeDataRequest(
Hash.EMPTY_TRIE_HASH, Hash.wrap(Bytes32.random()), Hash.EMPTY_TRIE_HASH, Bytes.EMPTY));
downloadState.checkCompletion(header);
assertThat(future).isNotDone();
assertThat(worldStateStorage.getAccountStateTrieNode(Bytes.EMPTY, ROOT_NODE_HASH)).isEmpty();
assertThat(downloadState.isDownloading()).isTrue();
}
@Test
public void shouldNotCompleteWhenThereAreTriePendingTasks() {
when(snapSyncState.isHealInProgress()).thenReturn(true);
downloadState.pendingTrieNodeRequests.add(
SnapDataRequest.createAccountTrieNodeDataRequest(
Hash.wrap(Bytes32.random()), Bytes.EMPTY, new HashSet<>()));
downloadState.checkCompletion(header);
assertThat(future).isNotDone();
assertThat(worldStateStorage.getAccountStateTrieNode(Bytes.EMPTY, ROOT_NODE_HASH)).isEmpty();
assertThat(downloadState.isDownloading()).isTrue();
}
@Test
public void shouldCancelOutstandingTasksWhenFutureIsCancelled() {
final EthTask<?> outstandingTask1 = mock(EthTask.class);
final EthTask<?> outstandingTask2 = mock(EthTask.class);
downloadState.addOutstandingTask(outstandingTask1);
downloadState.addOutstandingTask(outstandingTask2);
downloadState.pendingAccountRequests.add(
SnapDataRequest.createAccountDataRequest(
Hash.EMPTY_TRIE_HASH,
Hash.wrap(Bytes32.random()),
RangeManager.MIN_RANGE,
RangeManager.MAX_RANGE));
downloadState.pendingStorageRequests.add(
SnapDataRequest.createStorageTrieNodeDataRequest(
Hash.EMPTY_TRIE_HASH, Hash.wrap(Bytes32.random()), Hash.EMPTY_TRIE_HASH, Bytes.EMPTY));
downloadState.setWorldStateDownloadProcess(worldStateDownloadProcess);
future.cancel(true);
verify(outstandingTask1).cancel();
verify(outstandingTask2).cancel();
assertThat(downloadState.pendingAccountRequests.isEmpty()).isTrue();
assertThat(downloadState.pendingStorageRequests.isEmpty()).isTrue();
verify(worldStateDownloadProcess).abort();
assertThat(downloadState.isDownloading()).isFalse();
}
}

View File

@@ -0,0 +1,143 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.TrieGenerator;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.RangeStorageEntriesCollector;
import org.hyperledger.besu.ethereum.trie.TrieIterator;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import java.util.List;
import java.util.TreeMap;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.assertj.core.api.Assertions;
import org.junit.Test;
public class StackTrieTest {
final Bytes32 lastAccount = RangeManager.MIN_RANGE;
@Test
public void shouldNotSaveTheRootWhenIncomplete() {
final int nbAccounts = 15;
final WorldStateStorage worldStateStorage =
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
final WorldStateStorage recreatedWorldStateStorage =
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
final MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie =
TrieGenerator.generateTrie(worldStateStorage, nbAccounts);
final StackTrie stackTrie =
new StackTrie(Hash.wrap(accountStateTrie.getRootHash()), 0, 256, lastAccount);
stackTrie.addSegment();
final RangeStorageEntriesCollector collector =
RangeStorageEntriesCollector.createCollector(
lastAccount, RangeManager.MAX_RANGE, 5, Integer.MAX_VALUE);
final TrieIterator<Bytes> visitor = RangeStorageEntriesCollector.createVisitor(collector);
final TreeMap<Bytes32, Bytes> accounts =
(TreeMap<Bytes32, Bytes>)
accountStateTrie.entriesFrom(
root ->
RangeStorageEntriesCollector.collectEntries(
collector, visitor, root, lastAccount));
final WorldStateProofProvider worldStateProofProvider =
new WorldStateProofProvider(worldStateStorage);
// generate the proof
final List<Bytes> proofs =
worldStateProofProvider.getAccountProofRelatedNodes(
Hash.wrap(accountStateTrie.getRootHash()), lastAccount);
proofs.addAll(
worldStateProofProvider.getAccountProofRelatedNodes(
Hash.wrap(accountStateTrie.getRootHash()), accounts.lastKey()));
stackTrie.addKeys(accounts);
stackTrie.addProofs(proofs);
final WorldStateStorage.Updater updater = recreatedWorldStateStorage.updater();
stackTrie.commit(updater::putAccountStateTrieNode);
updater.commit();
Assertions.assertThat(
recreatedWorldStateStorage.getAccountStateTrieNode(
Bytes.EMPTY, accountStateTrie.getRootHash()))
.isEmpty();
}
@Test
public void shouldSaveTheRootWhenComplete() {
final int nbAccounts = 15;
final WorldStateStorage worldStateStorage =
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
final WorldStateStorage recreatedWorldStateStorage =
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage());
final MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie =
TrieGenerator.generateTrie(worldStateStorage, nbAccounts);
final StackTrie stackTrie =
new StackTrie(Hash.wrap(accountStateTrie.getRootHash()), 0, 256, lastAccount);
for (int i = 0; i < nbAccounts; i += 5) {
stackTrie.addSegment();
final RangeStorageEntriesCollector collector =
RangeStorageEntriesCollector.createCollector(
lastAccount, RangeManager.MAX_RANGE, 5, Integer.MAX_VALUE);
final TrieIterator<Bytes> visitor = RangeStorageEntriesCollector.createVisitor(collector);
final TreeMap<Bytes32, Bytes> accounts =
(TreeMap<Bytes32, Bytes>)
accountStateTrie.entriesFrom(
root ->
RangeStorageEntriesCollector.collectEntries(
collector, visitor, root, lastAccount));
final WorldStateProofProvider worldStateProofProvider =
new WorldStateProofProvider(worldStateStorage);
// generate the proof
final List<Bytes> proofs =
worldStateProofProvider.getAccountProofRelatedNodes(
Hash.wrap(accountStateTrie.getRootHash()), lastAccount);
proofs.addAll(
worldStateProofProvider.getAccountProofRelatedNodes(
Hash.wrap(accountStateTrie.getRootHash()), accounts.lastKey()));
stackTrie.addKeys(accounts);
stackTrie.addProofs(proofs);
final WorldStateStorage.Updater updater = recreatedWorldStateStorage.updater();
stackTrie.commit(updater::putAccountStateTrieNode);
updater.commit();
}
Assertions.assertThat(
worldStateStorage.getAccountStateTrieNode(Bytes.EMPTY, accountStateTrie.getRootHash()))
.isPresent();
}
}

View File

@@ -46,6 +46,7 @@ class BranchNode<V> implements Node<V> {
private WeakReference<Bytes> rlp;
private SoftReference<Bytes32> hash;
private boolean dirty = false;
private boolean needHeal = false;
BranchNode(
final Bytes location,
@@ -258,4 +259,14 @@ class BranchNode<V> implements Node<V> {
public void markDirty() {
dirty = true;
}
@Override
public boolean isHealNeeded() {
return needHeal;
}
@Override
public void markHealNeeded() {
this.needHeal = true;
}
}

View File

@@ -66,7 +66,7 @@ public class CommitVisitor<V> implements LocationNodeVisitor<V> {
@Override
public void visit(final Bytes location, final NullNode<V> nullNode) {}
private void maybeStoreNode(final Bytes location, final Node<V> node) {
public void maybeStoreNode(final Bytes location, final Node<V> node) {
final Bytes nodeRLP = node.getRlp();
if (nodeRLP.size() >= 32) {
this.nodeUpdater.store(location, node.getHash(), nodeRLP);

View File

@@ -37,6 +37,7 @@ class ExtensionNode<V> implements Node<V> {
private WeakReference<Bytes> rlp;
private SoftReference<Bytes32> hash;
private boolean dirty = false;
private boolean needHeal = false;
ExtensionNode(
final Bytes location,
@@ -179,4 +180,14 @@ class ExtensionNode<V> implements Node<V> {
public void markDirty() {
dirty = true;
}
@Override
public boolean isHealNeeded() {
return needHeal;
}
@Override
public void markHealNeeded() {
this.needHeal = true;
}
}

View File

@@ -165,4 +165,14 @@ class LeafNode<V> implements Node<V> {
public void markDirty() {
dirty = true;
}
@Override
public boolean isHealNeeded() {
return false;
}
@Override
public void markHealNeeded() {
// nothing to do a leaf don't have child
}
}

View File

@@ -42,6 +42,14 @@ public interface MerklePatriciaTrie<K, V> {
*/
Optional<V> get(K key);
/**
* Returns an {@code Optional} of value mapped to the given path if it exists; otherwise empty.
*
* @param path The path for the value.
* @return an {@code Optional} of value mapped to the given path if it exists; otherwise empty
*/
Optional<V> getPath(final K path);
/**
* Returns value and ordered proof-related nodes mapped to the hash if it exists; otherwise empty.
*
@@ -59,6 +67,15 @@ public interface MerklePatriciaTrie<K, V> {
*/
void put(K key, V value);
/**
* Updates the value mapped to the specified key, creating the mapping if one does not already
* exist.
*
* @param key The key that corresponds to the value to be updated.
* @param putVisitor custom visitor for the update
*/
void put(K key, PutVisitor<V> putVisitor);
/**
* Deletes the value mapped to the specified key, if such a value exists (Optional operation).
*
@@ -88,6 +105,14 @@ public interface MerklePatriciaTrie<K, V> {
*/
void commit(NodeUpdater nodeUpdater);
/**
* Commits any pending changes to the underlying storage.
*
* @param nodeUpdater used to store the node values
* @param commitVisitor custom visitor for the commit
*/
void commit(NodeUpdater nodeUpdater, CommitVisitor<V> commitVisitor);
/**
* Retrieve up to {@code limit} storage entries beginning from the first entry with hash equal to
* or greater than {@code startKeyHash}.

View File

@@ -41,6 +41,11 @@ public class MissingNode<V> extends NullNode<V> {
return path;
}
@Override
public boolean isHealNeeded() {
return true;
}
@Override
public Optional<Bytes> getLocation() {
return Optional.ofNullable(location);

View File

@@ -71,4 +71,16 @@ public interface Node<V> {
/** Unloads the node if it is, for example, a StoredNode. */
default void unload() {}
/**
* Return if a node needs heal. If one of its children missing in the storage
*
* @return true if the node need heal
*/
boolean isHealNeeded();
/**
* Marking a node as need heal means that one of its children is not yet present in the storage
*/
void markHealNeeded();
}

View File

@@ -96,4 +96,14 @@ public class NullNode<V> implements Node<V> {
public void markDirty() {
// do nothing
}
@Override
public boolean isHealNeeded() {
return false;
}
@Override
public void markHealNeeded() {
// do nothing
}
}

View File

@@ -28,7 +28,6 @@ public class PutVisitor<V> implements PathNodeVisitor<V> {
@Override
public Node<V> visit(final ExtensionNode<V> extensionNode, final Bytes path) {
final Bytes extensionPath = extensionNode.getPath();
final int commonPathLength = extensionPath.commonPrefixLength(path);
assert commonPathLength < path.size()
: "Visiting path doesn't end with a non-matching terminator";

View File

@@ -158,6 +158,17 @@ public class RestoreVisitor<V> implements PathNodeVisitor<V> {
"A persisted node cannot ever be dirty since it's loaded from storage");
}
@Override
public boolean isHealNeeded() {
return false;
}
@Override
public void markHealNeeded() {
throw new UnsupportedOperationException(
"A persisted node cannot be healed since it's loaded from storage");
}
@Override
public Node<V> accept(final PathNodeVisitor<V> visitor, final Bytes path) {
// do nothing

View File

@@ -59,6 +59,12 @@ public class SimpleMerklePatriciaTrie<K extends Bytes, V> implements MerklePatri
return root.accept(getVisitor, bytesToPath(key)).getValue();
}
@Override
public Optional<V> getPath(final K path) {
checkNotNull(path);
return root.accept(getVisitor, path).getValue();
}
@Override
public Proof<V> getValueWithProof(final K key) {
checkNotNull(key);
@@ -76,6 +82,12 @@ public class SimpleMerklePatriciaTrie<K extends Bytes, V> implements MerklePatri
this.root = root.accept(new PutVisitor<>(nodeFactory, value), bytesToPath(key));
}
@Override
public void put(final K key, final PutVisitor<V> putVisitor) {
checkNotNull(key);
this.root = root.accept(putVisitor, bytesToPath(key));
}
@Override
public void remove(final K key) {
checkNotNull(key);
@@ -103,6 +115,11 @@ public class SimpleMerklePatriciaTrie<K extends Bytes, V> implements MerklePatri
// Nothing to do here
}
@Override
public void commit(final NodeUpdater nodeUpdater, final CommitVisitor<V> commitVisitor) {
// Nothing to do here
}
@Override
public Map<Bytes32, V> entriesFrom(final Bytes32 startKeyHash, final int limit) {
return StorageEntriesCollector.collectEntries(root, startKeyHash, limit);

View File

@@ -0,0 +1,48 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.trie;
import org.apache.tuweni.bytes.Bytes;
public class SnapPutVisitor<V> extends PutVisitor<V> {
public SnapPutVisitor(final NodeFactory<V> nodeFactory, final V value) {
super(nodeFactory, value);
}
@Override
public Node<V> visit(final BranchNode<V> branchNode, final Bytes path) {
final Node<V> visit = super.visit(branchNode, path);
for (Node<V> child : visit.getChildren()) {
if (child.isHealNeeded() || (child instanceof StoredNode && child.getValue().isEmpty())) {
visit.markHealNeeded(); // not save an incomplete node
return visit;
}
}
return visit;
}
@Override
public Node<V> visit(final ExtensionNode<V> extensionNode, final Bytes path) {
final Node<V> visit = super.visit(extensionNode, path);
for (Node<V> child : visit.getChildren()) {
if (child.isHealNeeded() || (child instanceof StoredNode && child.getValue().isEmpty())) {
visit.markHealNeeded(); // not save an incomplete node
return visit;
}
}
return visit;
}
}

View File

@@ -58,6 +58,29 @@ public class StoredMerklePatriciaTrie<K extends Bytes, V> implements MerklePatri
this(nodeLoader, EMPTY_TRIE_NODE_HASH, valueSerializer, valueDeserializer);
}
/**
* Create a trie.
*
* @param nodeLoader The {@link NodeLoader} to retrieve node data from.
* @param rootHash The initial root has for the trie, which should be already present in {@code
* storage}.
* @param rootLocation The initial root location for the trie
* @param valueSerializer A function for serializing values to bytes.
* @param valueDeserializer A function for deserializing values from bytes.
*/
public StoredMerklePatriciaTrie(
final NodeLoader nodeLoader,
final Bytes32 rootHash,
final Bytes rootLocation,
final Function<V, Bytes> valueSerializer,
final Function<Bytes, V> valueDeserializer) {
this.nodeFactory = new StoredNodeFactory<>(nodeLoader, valueSerializer, valueDeserializer);
this.root =
rootHash.equals(EMPTY_TRIE_NODE_HASH)
? NullNode.instance()
: new StoredNode<>(nodeFactory, rootLocation, rootHash);
}
/**
* Create a trie.
*
@@ -72,11 +95,7 @@ public class StoredMerklePatriciaTrie<K extends Bytes, V> implements MerklePatri
final Bytes32 rootHash,
final Function<V, Bytes> valueSerializer,
final Function<Bytes, V> valueDeserializer) {
this.nodeFactory = new StoredNodeFactory<>(nodeLoader, valueSerializer, valueDeserializer);
this.root =
rootHash.equals(EMPTY_TRIE_NODE_HASH)
? NullNode.instance()
: new StoredNode<>(nodeFactory, Bytes.EMPTY, rootHash);
this(nodeLoader, rootHash, Bytes.EMPTY, valueSerializer, valueDeserializer);
}
/**
@@ -100,6 +119,12 @@ public class StoredMerklePatriciaTrie<K extends Bytes, V> implements MerklePatri
return root.accept(getVisitor, bytesToPath(key)).getValue();
}
@Override
public Optional<V> getPath(final K path) {
checkNotNull(path);
return root.accept(getVisitor, path).getValue();
}
@Override
public Proof<V> getValueWithProof(final K key) {
checkNotNull(key);
@@ -117,6 +142,12 @@ public class StoredMerklePatriciaTrie<K extends Bytes, V> implements MerklePatri
this.root = root.accept(new PutVisitor<>(nodeFactory, value), bytesToPath(key));
}
@Override
public void put(final K key, final PutVisitor<V> putVisitor) {
checkNotNull(key);
this.root = root.accept(putVisitor, bytesToPath(key));
}
@Override
public void remove(final K key) {
checkNotNull(key);
@@ -131,7 +162,11 @@ public class StoredMerklePatriciaTrie<K extends Bytes, V> implements MerklePatri
@Override
public void commit(final NodeUpdater nodeUpdater) {
final CommitVisitor<V> commitVisitor = new CommitVisitor<>(nodeUpdater);
commit(nodeUpdater, new CommitVisitor<>(nodeUpdater));
}
@Override
public void commit(final NodeUpdater nodeUpdater, final CommitVisitor<V> commitVisitor) {
root.accept(Bytes.EMPTY, commitVisitor);
// Make sure root node was stored
if (root.isDirty() && root.getRlpRef().size() < 32) {

View File

@@ -47,6 +47,17 @@ class StoredNode<V> implements Node<V> {
"A stored node cannot ever be dirty since it's loaded from storage");
}
@Override
public boolean isHealNeeded() {
return false;
}
@Override
public void markHealNeeded() {
throw new IllegalStateException(
"A stored node cannot be healed since it's loaded from storage");
}
@Override
public Node<V> accept(final PathNodeVisitor<V> visitor, final Bytes path) {
final Node<V> node = load();

View File

@@ -52,7 +52,7 @@ public abstract class AbstractMerklePatriciaTrieTest {
@Test(expected = NullPointerException.class)
public void throwsOnUpdateWithNull() {
trie.put(Bytes.EMPTY, null);
trie.put(Bytes.EMPTY, (String) null);
}
@Test

View File

@@ -0,0 +1,125 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.trie;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyByte;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.plugin.data.Hash;
import java.util.ArrayList;
import java.util.Optional;
import java.util.function.Function;
import org.apache.tuweni.bytes.Bytes;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@SuppressWarnings("unchecked")
public class SnapPutVisitorTest {
@Test
public void shouldDetectValidBranch() {
final StoredNodeFactory<Bytes> storedNodeFactory = mock(StoredNodeFactory.class);
when(storedNodeFactory.createBranch(any(), any()))
.thenReturn(
new LeafNode<Bytes>(
Bytes.EMPTY, Bytes.of(0x00), storedNodeFactory, Function.identity()));
final ArrayList<Node<Bytes>> children = new ArrayList<>();
for (int i = 0; i < BranchNode.RADIX; i++) {
children.add(new StoredNode<>(storedNodeFactory, Bytes.EMPTY, Hash.ZERO));
}
final BranchNode<Bytes> invalidBranchNode =
new BranchNode<>(
Bytes.EMPTY,
children,
Optional.of(Bytes.of(0x00)),
storedNodeFactory,
Function.identity());
final SnapPutVisitor<Bytes> snapPutVisitor =
new SnapPutVisitor<>(storedNodeFactory, Bytes.EMPTY);
Node<Bytes> visit =
snapPutVisitor.visit(invalidBranchNode, Bytes.of(CompactEncoding.LEAF_TERMINATOR));
Assertions.assertThat(visit.isHealNeeded()).isFalse();
}
@Test
public void shouldDetectBranchWithMissingChildren() {
final StoredNodeFactory<Bytes> storedNodeFactory = mock(StoredNodeFactory.class);
when(storedNodeFactory.createBranch(any(), any()))
.thenReturn(new MissingNode<>(Hash.ZERO, Bytes.EMPTY));
final ArrayList<Node<Bytes>> children = new ArrayList<>();
for (int i = 0; i < BranchNode.RADIX; i++) {
children.add(new StoredNode<>(storedNodeFactory, Bytes.EMPTY, Hash.ZERO));
}
final BranchNode<Bytes> invalidBranchNode =
new BranchNode<>(
Bytes.EMPTY,
children,
Optional.of(Bytes.of(0x00)),
storedNodeFactory,
Function.identity());
final SnapPutVisitor<Bytes> snapPutVisitor =
new SnapPutVisitor<>(storedNodeFactory, Bytes.EMPTY);
Node<Bytes> visit =
snapPutVisitor.visit(invalidBranchNode, Bytes.of(CompactEncoding.LEAF_TERMINATOR));
Assertions.assertThat(visit.isHealNeeded()).isTrue();
}
@Test
public void shouldDetectValidExtension() {
final StoredNodeFactory<Bytes> storedNodeFactory = mock(StoredNodeFactory.class);
when(storedNodeFactory.createBranch(any(), any()))
.thenReturn(
new LeafNode<>(Bytes.EMPTY, Bytes.of(0x00), storedNodeFactory, Function.identity()));
final ArrayList<Node<Bytes>> children = new ArrayList<>();
for (int i = 0; i < BranchNode.RADIX; i++) {
children.add(new StoredNode<>(storedNodeFactory, Bytes.EMPTY, Hash.ZERO));
}
final BranchNode<Bytes> invalidBranchNode =
new BranchNode<>(
Bytes.EMPTY,
children,
Optional.of(Bytes.of(0x00)),
storedNodeFactory,
Function.identity());
final SnapPutVisitor<Bytes> snapPutVisitor =
new SnapPutVisitor<>(storedNodeFactory, Bytes.EMPTY);
Node<Bytes> visit =
snapPutVisitor.visit(invalidBranchNode, Bytes.of(CompactEncoding.LEAF_TERMINATOR));
Assertions.assertThat(visit.isHealNeeded()).isFalse();
}
@Test
public void shouldDetectExtensionWithMissingChildren() {
final StoredNodeFactory<Bytes> storedNodeFactory = mock(StoredNodeFactory.class);
when(storedNodeFactory.createBranch(anyByte(), any(), anyByte(), any()))
.thenReturn(new MissingNode<>(Hash.ZERO, Bytes.EMPTY));
when(storedNodeFactory.createLeaf(any(), any()))
.thenReturn(new MissingNode<>(Hash.ZERO, Bytes.EMPTY));
final ExtensionNode<Bytes> invalidBranchNode =
new ExtensionNode<>(
Bytes.of(0x00),
new StoredNode<>(storedNodeFactory, Bytes.EMPTY, Hash.ZERO),
storedNodeFactory);
final SnapPutVisitor<Bytes> snapPutVisitor =
new SnapPutVisitor<>(storedNodeFactory, Bytes.EMPTY);
Node<Bytes> visit =
snapPutVisitor.visit(invalidBranchNode, Bytes.of(CompactEncoding.LEAF_TERMINATOR));
Assertions.assertThat(visit.isHealNeeded()).isTrue();
}
}

View File

@@ -63,6 +63,10 @@ public class InMemoryTaskQueue<T> implements TaskCollection<T> {
internalQueue.clear();
}
public void clearInternalQueue() {
internalQueue.clear();
}
@Override
public synchronized boolean allTasksCompleted() {
assertNotClosed();

View File

@@ -35,7 +35,7 @@ public class InMemoryTasksPriorityQueues<T extends TasksPriorityProvider>
clearInternalQueues();
}
private void clearInternalQueues() {
public void clearInternalQueues() {
internalQueues.clear();
for (int i = 0; i < 16; i++) {
internalQueues.add(newEmptyQueue());