init heal code (#5059)

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
Co-authored-by: Simon Dudley <simon.dudley@consensys.net>
Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
This commit is contained in:
matkt
2023-02-08 05:30:11 +01:00
committed by GitHub
parent 38d666d8b1
commit 77b55ed7d0
44 changed files with 582 additions and 865 deletions

View File

@@ -632,6 +632,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
ethProtocolManager,
pivotBlockSelector);
protocolContext.setSynchronizer(Optional.of(synchronizer));
final MiningCoordinator miningCoordinator =
createMiningCoordinator(
protocolSchedule,
@@ -713,7 +715,6 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
clock,
metricsSystem,
getFullSyncTerminationCondition(protocolContext.getBlockchain()),
ethProtocolManager,
pivotBlockSelector);
return toUse;

View File

@@ -167,7 +167,16 @@ public class MainnetBlockValidator implements BlockValidator {
return new BlockProcessingResult(
Optional.of(new BlockProcessingOutputs(worldState, receipts)));
}
} catch (StorageException | MerkleTrieException ex) {
} catch (MerkleTrieException ex) {
context
.getSynchronizer()
.ifPresentOrElse(
synchronizer -> synchronizer.healWorldState(ex.getMaybeAddress(), ex.getLocation()),
() ->
handleAndLogImportFailure(
block, new BlockProcessingResult(Optional.empty(), ex)));
return new BlockProcessingResult(Optional.empty(), ex);
} catch (StorageException ex) {
var retval = new BlockProcessingResult(Optional.empty(), ex);
handleAndLogImportFailure(block, retval);
return retval;

View File

@@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
@@ -30,6 +31,8 @@ public class ProtocolContext {
private final WorldStateArchive worldStateArchive;
private final ConsensusContext consensusContext;
private Optional<Synchronizer> synchronizer;
public ProtocolContext(
final MutableBlockchain blockchain,
final WorldStateArchive worldStateArchive,
@@ -37,6 +40,7 @@ public class ProtocolContext {
this.blockchain = blockchain;
this.worldStateArchive = worldStateArchive;
this.consensusContext = consensusContext;
this.synchronizer = Optional.empty();
}
public static ProtocolContext init(
@@ -50,6 +54,14 @@ public class ProtocolContext {
consensusContextFactory.create(blockchain, worldStateArchive, protocolSchedule));
}
public Optional<Synchronizer> getSynchronizer() {
return synchronizer;
}
public void setSynchronizer(final Optional<Synchronizer> synchronizer) {
this.synchronizer = synchronizer;
}
public MutableBlockchain getBlockchain() {
return blockchain;
}

View File

@@ -180,7 +180,7 @@ public class BonsaiAccount implements MutableAccount, EvmAccount {
@Override
public Bytes getCode() {
if (code == null) {
code = context.getCode(address).orElse(Bytes.EMPTY);
code = context.getCode(address, codeHash).orElse(Bytes.EMPTY);
}
return code;
}

View File

@@ -20,9 +20,11 @@ import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage.BonsaiStorageSubscriber;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.trie.MerkleTrieException;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.tuweni.bytes.Bytes;
@@ -84,13 +86,19 @@ public class BonsaiInMemoryWorldState extends BonsaiPersistedWorldState
final Bytes accountKey = accountUpdate.getKey();
final BonsaiValue<BonsaiAccount> bonsaiValue = accountUpdate.getValue();
final BonsaiAccount updatedAccount = bonsaiValue.getUpdated();
if (updatedAccount == null) {
final Hash addressHash = Hash.hash(accountKey);
accountTrie.remove(addressHash);
} else {
final Hash addressHash = updatedAccount.getAddressHash();
final Bytes accountValue = updatedAccount.serializeAccount();
accountTrie.put(addressHash, accountValue);
try {
if (updatedAccount == null) {
final Hash addressHash = Hash.hash(accountKey);
accountTrie.remove(addressHash);
} else {
final Hash addressHash = updatedAccount.getAddressHash();
final Bytes accountValue = updatedAccount.serializeAccount();
accountTrie.put(addressHash, accountValue);
}
} catch (MerkleTrieException e) {
// need to throw to trigger the heal
throw new MerkleTrieException(
e.getMessage(), Optional.of(Address.wrap(accountKey)), e.getHash(), e.getLocation());
}
}
@@ -129,10 +137,19 @@ public class BonsaiInMemoryWorldState extends BonsaiPersistedWorldState
storageAccountUpdate.getValue().entrySet()) {
final Hash keyHash = storageUpdate.getKey();
final UInt256 updatedStorage = storageUpdate.getValue().getUpdated();
if (updatedStorage == null || updatedStorage.equals(UInt256.ZERO)) {
storageTrie.remove(keyHash);
} else {
storageTrie.put(keyHash, BonsaiWorldView.encodeTrieValue(updatedStorage));
try {
if (updatedStorage == null || updatedStorage.equals(UInt256.ZERO)) {
storageTrie.remove(keyHash);
} else {
storageTrie.put(keyHash, BonsaiWorldView.encodeTrieValue(updatedStorage));
}
} catch (MerkleTrieException e) {
// need to throw to trigger the heal
throw new MerkleTrieException(
e.getMessage(),
Optional.of(Address.wrap(updatedAddress)),
e.getHash(),
e.getLocation());
}
}

View File

@@ -14,13 +14,10 @@
*/
package org.hyperledger.besu.ethereum.bonsai;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,15 +32,8 @@ public class BonsaiInMemoryWorldStateKeyValueStorage extends BonsaiWorldStateKey
final KeyValueStorage codeStorage,
final KeyValueStorage storageStorage,
final KeyValueStorage trieBranchStorage,
final KeyValueStorage trieLogStorage,
final Optional<PeerTrieNodeFinder> fallbackNodeFinder) {
super(
accountStorage,
codeStorage,
storageStorage,
trieBranchStorage,
trieLogStorage,
fallbackNodeFinder);
final KeyValueStorage trieLogStorage) {
super(accountStorage, codeStorage, storageStorage, trieBranchStorage, trieLogStorage);
}
@Override

View File

@@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.SnapshotMutableWorldState;
import org.hyperledger.besu.ethereum.trie.MerkleTrieException;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.evm.account.Account;
import org.hyperledger.besu.evm.worldstate.WorldState;
@@ -107,7 +108,7 @@ public class BonsaiLayeredWorldState implements MutableWorldState, BonsaiWorldVi
}
@Override
public Optional<Bytes> getCode(final Address address) {
public Optional<Bytes> getCode(final Address address, final Hash codeHash) {
BonsaiLayeredWorldState currentLayer = this;
while (currentLayer != null) {
final Optional<Bytes> maybeCode = currentLayer.trieLog.getCode(address);
@@ -124,7 +125,7 @@ public class BonsaiLayeredWorldState implements MutableWorldState, BonsaiWorldVi
} else if (currentLayer.getNextWorldView().get() instanceof BonsaiLayeredWorldState) {
currentLayer = (BonsaiLayeredWorldState) currentLayer.getNextWorldView().get();
} else {
return currentLayer.getNextWorldView().get().getCode(address);
return currentLayer.getNextWorldView().get().getCode(address, codeHash);
}
}
return Optional.empty();
@@ -291,6 +292,8 @@ public class BonsaiLayeredWorldState implements MutableWorldState, BonsaiWorldVi
new StorageException(
"Unable to copy Layered Worldstate for " + blockHash().toHexString()))) {
return new BonsaiInMemoryWorldState(archive, snapshot.getWorldStateStorage());
} catch (MerkleTrieException ex) {
throw ex; // need to throw to trigger the heal
} catch (Exception ex) {
throw new RuntimeException(ex);
}

View File

@@ -25,6 +25,7 @@ import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.trie.MerkleTrieException;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.evm.account.Account;
@@ -92,15 +93,14 @@ public class BonsaiPersistedWorldState implements MutableWorldState, BonsaiWorld
worldStateStorage.codeStorage,
worldStateStorage.storageStorage,
worldStateStorage.trieBranchStorage,
worldStateStorage.trieLogStorage,
getWorldStateStorage().getMaybeFallbackNodeFinder());
worldStateStorage.trieLogStorage);
return new BonsaiInMemoryWorldState(archive, bonsaiInMemoryWorldStateKeyValueStorage);
}
@Override
public Optional<Bytes> getCode(@Nonnull final Address address) {
return worldStateStorage.getCode(null, Hash.hash(address));
public Optional<Bytes> getCode(@Nonnull final Address address, final Hash codeHash) {
return worldStateStorage.getCode(codeHash, Hash.hash(address));
}
public void setArchiveStateUnSafe(final BlockHeader blockHeader) {
@@ -148,7 +148,7 @@ public class BonsaiPersistedWorldState implements MutableWorldState, BonsaiWorld
return Hash.wrap(rootHash);
}
private static void addTheAccounts(
private void addTheAccounts(
final BonsaiWorldStateKeyValueStorage.BonsaiUpdater stateUpdater,
final BonsaiWorldStateUpdater worldStateUpdater,
final StoredMerklePatriciaTrie<Bytes, Bytes> accountTrie) {
@@ -157,20 +157,26 @@ public class BonsaiPersistedWorldState implements MutableWorldState, BonsaiWorld
final Bytes accountKey = accountUpdate.getKey();
final BonsaiValue<BonsaiAccount> bonsaiValue = accountUpdate.getValue();
final BonsaiAccount updatedAccount = bonsaiValue.getUpdated();
if (updatedAccount == null) {
final Hash addressHash = Hash.hash(accountKey);
accountTrie.remove(addressHash);
stateUpdater.removeAccountInfoState(addressHash);
} else {
final Hash addressHash = updatedAccount.getAddressHash();
final Bytes accountValue = updatedAccount.serializeAccount();
stateUpdater.putAccountInfoState(Hash.hash(accountKey), accountValue);
accountTrie.put(addressHash, accountValue);
try {
if (updatedAccount == null) {
final Hash addressHash = Hash.hash(accountKey);
accountTrie.remove(addressHash);
stateUpdater.removeAccountInfoState(addressHash);
} else {
final Hash addressHash = updatedAccount.getAddressHash();
final Bytes accountValue = updatedAccount.serializeAccount();
stateUpdater.putAccountInfoState(Hash.hash(accountKey), accountValue);
accountTrie.put(addressHash, accountValue);
}
} catch (MerkleTrieException e) {
// need to throw to trigger the heal
throw new MerkleTrieException(
e.getMessage(), Optional.of(Address.wrap(accountKey)), e.getHash(), e.getLocation());
}
}
}
private static void updateCode(
private void updateCode(
final BonsaiWorldStateKeyValueStorage.BonsaiUpdater stateUpdater,
final BonsaiWorldStateUpdater worldStateUpdater) {
for (final Map.Entry<Address, BonsaiValue<Bytes>> codeUpdate :
@@ -215,12 +221,21 @@ public class BonsaiPersistedWorldState implements MutableWorldState, BonsaiWorld
storageAccountUpdate.getValue().entrySet()) {
final Hash keyHash = storageUpdate.getKey();
final UInt256 updatedStorage = storageUpdate.getValue().getUpdated();
if (updatedStorage == null || updatedStorage.equals(UInt256.ZERO)) {
stateUpdater.removeStorageValueBySlotHash(updatedAddressHash, keyHash);
storageTrie.remove(keyHash);
} else {
stateUpdater.putStorageValueBySlotHash(updatedAddressHash, keyHash, updatedStorage);
storageTrie.put(keyHash, BonsaiWorldView.encodeTrieValue(updatedStorage));
try {
if (updatedStorage == null || updatedStorage.equals(UInt256.ZERO)) {
stateUpdater.removeStorageValueBySlotHash(updatedAddressHash, keyHash);
storageTrie.remove(keyHash);
} else {
stateUpdater.putStorageValueBySlotHash(updatedAddressHash, keyHash, updatedStorage);
storageTrie.put(keyHash, BonsaiWorldView.encodeTrieValue(updatedStorage));
}
} catch (MerkleTrieException e) {
// need to throw to trigger the heal
throw new MerkleTrieException(
e.getMessage(),
Optional.of(Address.wrap(updatedAddress)),
e.getHash(),
e.getLocation());
}
}
@@ -259,18 +274,24 @@ public class BonsaiPersistedWorldState implements MutableWorldState, BonsaiWorld
oldAccount.getStorageRoot(),
Function.identity(),
Function.identity());
Map<Bytes32, Bytes> entriesToDelete = storageTrie.entriesFrom(Bytes32.ZERO, 256);
while (!entriesToDelete.isEmpty()) {
entriesToDelete
.keySet()
.forEach(
k -> stateUpdater.removeStorageValueBySlotHash(Hash.hash(address), Hash.wrap(k)));
entriesToDelete.keySet().forEach(storageTrie::remove);
if (entriesToDelete.size() == 256) {
entriesToDelete = storageTrie.entriesFrom(Bytes32.ZERO, 256);
} else {
break;
try {
Map<Bytes32, Bytes> entriesToDelete = storageTrie.entriesFrom(Bytes32.ZERO, 256);
while (!entriesToDelete.isEmpty()) {
entriesToDelete
.keySet()
.forEach(
k -> stateUpdater.removeStorageValueBySlotHash(Hash.hash(address), Hash.wrap(k)));
entriesToDelete.keySet().forEach(storageTrie::remove);
if (entriesToDelete.size() == 256) {
entriesToDelete = storageTrie.entriesFrom(Bytes32.ZERO, 256);
} else {
break;
}
}
} catch (MerkleTrieException e) {
// need to throw to trigger the heal
throw new MerkleTrieException(
e.getMessage(), Optional.of(Address.wrap(address)), e.getHash(), e.getLocation());
}
}
}

View File

@@ -19,8 +19,6 @@ import static org.hyperledger.besu.util.Slf4jLambdaHelper.warnLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage.BonsaiStorageSubscriber;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.services.exception.StorageException;
@@ -50,25 +48,6 @@ public class BonsaiSnapshotWorldStateKeyValueStorage extends BonsaiWorldStateKey
private final AtomicBoolean shouldClose = new AtomicBoolean(false);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
public BonsaiSnapshotWorldStateKeyValueStorage(final StorageProvider snappableStorageProvider) {
this(
snappableStorageProvider
.getSnappableStorageBySegmentIdentifier(KeyValueSegmentIdentifier.ACCOUNT_INFO_STATE)
.takeSnapshot(),
snappableStorageProvider
.getSnappableStorageBySegmentIdentifier(KeyValueSegmentIdentifier.CODE_STORAGE)
.takeSnapshot(),
snappableStorageProvider
.getSnappableStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.ACCOUNT_STORAGE_STORAGE)
.takeSnapshot(),
snappableStorageProvider
.getSnappableStorageBySegmentIdentifier(KeyValueSegmentIdentifier.TRIE_BRANCH_STORAGE)
.takeSnapshot(),
snappableStorageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.TRIE_LOG_STORAGE));
}
public BonsaiSnapshotWorldStateKeyValueStorage(
final SnappedKeyValueStorage accountStorage,
final SnappedKeyValueStorage codeStorage,
@@ -100,6 +79,12 @@ public class BonsaiSnapshotWorldStateKeyValueStorage extends BonsaiWorldStateKey
throw new StorageException("Snapshot storage does not implement clear");
}
@Override
public void clearTrieLog() {
// snapshot storage does not implement clear
throw new StorageException("Snapshot storage does not implement clear");
}
@Override
public synchronized long subscribe(final BonsaiStorageSubscriber sub) {
if (isClosed.get()) {
@@ -143,6 +128,16 @@ public class BonsaiSnapshotWorldStateKeyValueStorage extends BonsaiWorldStateKey
}
}
@Override
public void onClearTrieLog() {
// when the parent storage clears, close regardless of subscribers
try {
doClose();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public synchronized void close() throws Exception {
// when the parent storage clears, close

View File

@@ -16,7 +16,6 @@
package org.hyperledger.besu.ethereum.bonsai;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.hyperledger.besu.datatypes.Hash.fromPlugin;
import static org.hyperledger.besu.ethereum.bonsai.LayeredTrieLogManager.RETAINED_LAYERS;
@@ -28,16 +27,22 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.SnapshotMutableWorldState;
import org.hyperledger.besu.ethereum.proof.WorldStateProof;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.trie.MerkleTrieException;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.evm.worldstate.WorldState;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes;
@@ -275,6 +280,9 @@ public class BonsaiWorldStateArchive implements WorldStateArchive {
LOG.debug("Archive rolling finished, now at {}", blockHash);
return Optional.of(mutableState);
} catch (final MerkleTrieException re) {
// need to throw to trigger the heal
throw re;
} catch (final Exception e) {
// if we fail we must clean up the updater
bonsaiUpdater.reset();
@@ -282,7 +290,11 @@ public class BonsaiWorldStateArchive implements WorldStateArchive {
return Optional.empty();
}
} catch (final RuntimeException re) {
LOG.error("Archive rolling failed for block hash " + blockHash, re);
LOG.trace("Archive rolling failed for block hash " + blockHash, re);
if (re instanceof MerkleTrieException) {
// need to throw to trigger the heal
throw re;
}
return Optional.empty();
}
}
@@ -302,6 +314,57 @@ public class BonsaiWorldStateArchive implements WorldStateArchive {
return persistedState;
}
public void prepareStateHealing(final Address address, final Bytes location) {
final Set<Bytes> keysToDelete = new HashSet<>();
final BonsaiWorldStateKeyValueStorage.BonsaiUpdater updater = worldStateStorage.updater();
final Hash accountHash = Hash.hash(address);
final StoredMerklePatriciaTrie<Bytes, Bytes> accountTrie =
new StoredMerklePatriciaTrie<>(
(l, h) -> {
final Optional<Bytes> node = worldStateStorage.getAccountStateTrieNode(l, h);
if (node.isPresent()) {
keysToDelete.add(l);
}
return node;
},
persistedState.worldStateRootHash,
Function.identity(),
Function.identity());
try {
accountTrie
.get(accountHash)
.map(RLP::input)
.map(StateTrieAccountValue::readFrom)
.ifPresent(
account -> {
final StoredMerklePatriciaTrie<Bytes, Bytes> storageTrie =
new StoredMerklePatriciaTrie<>(
(l, h) -> {
Optional<Bytes> node =
worldStateStorage.getAccountStorageTrieNode(accountHash, l, h);
if (node.isPresent()) {
keysToDelete.add(Bytes.concatenate(accountHash, l));
}
return node;
},
account.getStorageRoot(),
Function.identity(),
Function.identity());
try {
storageTrie.getPath(location);
} catch (Exception eA) {
LOG.warn("Invalid slot found for account {} at location {}", address, location);
// ignore
}
});
} catch (Exception eA) {
LOG.warn("Invalid node for account {} at location {}", address, location);
// ignore
}
keysToDelete.forEach(bytes -> updater.removeAccountStateTrieNode(bytes, null));
updater.commit();
}
public TrieLogManager getTrieLogManager() {
return trieLogManager;
}
@@ -324,9 +387,4 @@ public class BonsaiWorldStateArchive implements WorldStateArchive {
// FIXME we can do proofs for layered tries and the persisted trie
return Optional.empty();
}
public void useFallbackNodeFinder(final Optional<PeerTrieNodeFinder> fallbackNodeFinder) {
checkNotNull(fallbackNodeFinder);
worldStateStorage.useFallbackNodeFinder(fallbackNodeFinder);
}
}

View File

@@ -14,15 +14,12 @@
*/
package org.hyperledger.besu.ethereum.bonsai;
import static com.google.common.base.Preconditions.checkNotNull;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StoredNodeFactory;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
@@ -53,16 +50,13 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage, AutoC
protected final KeyValueStorage trieLogStorage;
protected final Subscribers<BonsaiStorageSubscriber> subscribers = Subscribers.create();
private Optional<PeerTrieNodeFinder> maybeFallbackNodeFinder;
public BonsaiWorldStateKeyValueStorage(final StorageProvider provider) {
this(
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.ACCOUNT_INFO_STATE),
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.CODE_STORAGE),
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.ACCOUNT_STORAGE_STORAGE),
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.TRIE_BRANCH_STORAGE),
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.TRIE_LOG_STORAGE),
Optional.empty());
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.TRIE_LOG_STORAGE));
}
public BonsaiWorldStateKeyValueStorage(
@@ -71,33 +65,23 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage, AutoC
final KeyValueStorage storageStorage,
final KeyValueStorage trieBranchStorage,
final KeyValueStorage trieLogStorage) {
this(
accountStorage,
codeStorage,
storageStorage,
trieBranchStorage,
trieLogStorage,
Optional.empty());
}
public BonsaiWorldStateKeyValueStorage(
final KeyValueStorage accountStorage,
final KeyValueStorage codeStorage,
final KeyValueStorage storageStorage,
final KeyValueStorage trieBranchStorage,
final KeyValueStorage trieLogStorage,
final Optional<PeerTrieNodeFinder> fallbackNodeFinder) {
this.accountStorage = accountStorage;
this.codeStorage = codeStorage;
this.storageStorage = storageStorage;
this.trieBranchStorage = trieBranchStorage;
this.trieLogStorage = trieLogStorage;
this.maybeFallbackNodeFinder = fallbackNodeFinder;
}
@Override
public Optional<Bytes> getCode(final Bytes32 codeHash, final Hash accountHash) {
return codeStorage.get(accountHash.toArrayUnsafe()).map(Bytes::wrap);
if (codeHash.equals(Hash.EMPTY)) {
return Optional.of(Bytes.EMPTY);
} else {
return codeStorage
.get(accountHash.toArrayUnsafe())
.map(Bytes::wrap)
.filter(b -> Hash.hash(b).equals(codeHash));
}
}
public Optional<Bytes> getAccount(final Hash accountHash) {
@@ -128,17 +112,10 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage, AutoC
if (nodeHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
return Optional.of(MerklePatriciaTrie.EMPTY_TRIE_NODE);
} else {
final Optional<Bytes> value =
trieBranchStorage.get(location.toArrayUnsafe()).map(Bytes::wrap);
if (value.isPresent()) {
return value
.filter(b -> Hash.hash(b).equals(nodeHash))
.or(
() ->
maybeFallbackNodeFinder.flatMap(
finder -> finder.getAccountStateTrieNode(location, nodeHash)));
}
return Optional.empty();
return trieBranchStorage
.get(location.toArrayUnsafe())
.map(Bytes::wrap)
.filter(b -> Hash.hash(b).equals(nodeHash));
}
}
@@ -148,20 +125,10 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage, AutoC
if (nodeHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) {
return Optional.of(MerklePatriciaTrie.EMPTY_TRIE_NODE);
} else {
final Optional<Bytes> value =
trieBranchStorage
.get(Bytes.concatenate(accountHash, location).toArrayUnsafe())
.map(Bytes::wrap);
if (value.isPresent()) {
return value
.filter(b -> Hash.hash(b).equals(nodeHash))
.or(
() ->
maybeFallbackNodeFinder.flatMap(
finder ->
finder.getAccountStorageTrieNode(accountHash, location, nodeHash)));
}
return Optional.empty();
return trieBranchStorage
.get(Bytes.concatenate(accountHash, location).toArrayUnsafe())
.map(Bytes::wrap)
.filter(b -> Hash.hash(b).equals(nodeHash));
}
}
@@ -228,11 +195,10 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage, AutoC
@Override
public boolean isWorldStateAvailable(final Bytes32 rootHash, final Hash blockHash) {
return trieBranchStorage
.get(WORLD_ROOT_HASH_KEY)
.map(Bytes32::wrap)
.filter(hash -> hash.equals(rootHash))
.isPresent()
|| trieLogStorage.containsKey(blockHash.toArrayUnsafe());
.get(WORLD_ROOT_HASH_KEY)
.map(Bytes32::wrap)
.map(hash -> hash.equals(rootHash) || trieLogStorage.containsKey(blockHash.toArrayUnsafe()))
.orElse(false);
}
@Override
@@ -245,6 +211,12 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage, AutoC
trieLogStorage.clear();
}
@Override
public void clearTrieLog() {
subscribers.forEach(BonsaiStorageSubscriber::onClearTrieLog);
trieLogStorage.clear();
}
@Override
public void clearFlatDatabase() {
subscribers.forEach(BonsaiStorageSubscriber::onClearFlatDatabaseStorage);
@@ -277,15 +249,6 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage, AutoC
throw new RuntimeException("removeNodeAddedListener not available");
}
public Optional<PeerTrieNodeFinder> getMaybeFallbackNodeFinder() {
return maybeFallbackNodeFinder;
}
public void useFallbackNodeFinder(final Optional<PeerTrieNodeFinder> maybeFallbackNodeFinder) {
checkNotNull(maybeFallbackNodeFinder);
this.maybeFallbackNodeFinder = maybeFallbackNodeFinder;
}
public synchronized long subscribe(final BonsaiStorageSubscriber sub) {
return subscribers.subscribe(sub);
}
@@ -456,6 +419,8 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage, AutoC
default void onClearFlatDatabaseStorage() {}
default void onClearTrieLog() {}
default void onCloseStorage() {}
}
}

View File

@@ -20,6 +20,7 @@ import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.trie.MerkleTrieException;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.evm.account.Account;
import org.hyperledger.besu.evm.account.EvmAccount;
@@ -158,18 +159,25 @@ public class BonsaiWorldStateUpdater extends AbstractWorldUpdater<BonsaiWorldVie
protected BonsaiAccount loadAccount(
final Address address,
final Function<BonsaiValue<BonsaiAccount>, BonsaiAccount> bonsaiAccountFunction) {
final BonsaiValue<BonsaiAccount> bonsaiValue = accountsToUpdate.get(address);
if (bonsaiValue == null) {
final Account account = wrappedWorldView().get(address);
if (account instanceof BonsaiAccount) {
final BonsaiAccount mutableAccount = new BonsaiAccount((BonsaiAccount) account, this, true);
accountsToUpdate.put(address, new BonsaiValue<>((BonsaiAccount) account, mutableAccount));
return mutableAccount;
try {
final BonsaiValue<BonsaiAccount> bonsaiValue = accountsToUpdate.get(address);
if (bonsaiValue == null) {
final Account account = wrappedWorldView().get(address);
if (account instanceof BonsaiAccount) {
final BonsaiAccount mutableAccount =
new BonsaiAccount((BonsaiAccount) account, this, true);
accountsToUpdate.put(address, new BonsaiValue<>((BonsaiAccount) account, mutableAccount));
return mutableAccount;
} else {
return null;
}
} else {
return null;
return bonsaiAccountFunction.apply(bonsaiValue);
}
} else {
return bonsaiAccountFunction.apply(bonsaiValue);
} catch (MerkleTrieException e) {
// need to throw to trigger the heal
throw new MerkleTrieException(
e.getMessage(), Optional.of(address), e.getHash(), e.getLocation());
}
}
@@ -201,7 +209,12 @@ public class BonsaiWorldStateUpdater extends AbstractWorldUpdater<BonsaiWorldVie
codeValue.setUpdated(null);
} else {
wrappedWorldView()
.getCode(deletedAddress)
.getCode(
deletedAddress,
Optional.ofNullable(accountValue)
.map(BonsaiValue::getPrior)
.map(BonsaiAccount::getCodeHash)
.orElse(Hash.EMPTY))
.ifPresent(
deletedCode ->
codeToUpdate.put(deletedAddress, new BonsaiValue<>(deletedCode, null)));
@@ -251,9 +264,9 @@ public class BonsaiWorldStateUpdater extends AbstractWorldUpdater<BonsaiWorldVie
tracked -> {
final Address updatedAddress = tracked.getAddress();
final BonsaiAccount updatedAccount;
final BonsaiValue<BonsaiAccount> updatedAccountValue =
accountsToUpdate.get(updatedAddress);
if (tracked.getWrappedAccount() == null) {
final BonsaiValue<BonsaiAccount> updatedAccountValue =
accountsToUpdate.get(updatedAddress);
updatedAccount = new BonsaiAccount(this, tracked);
tracked.setWrappedAccount(updatedAccount);
if (updatedAccountValue == null) {
@@ -281,7 +294,16 @@ public class BonsaiWorldStateUpdater extends AbstractWorldUpdater<BonsaiWorldVie
codeToUpdate.computeIfAbsent(
updatedAddress,
addr ->
new BonsaiValue<>(wrappedWorldView().getCode(addr).orElse(null), null));
new BonsaiValue<>(
wrappedWorldView()
.getCode(
addr,
Optional.ofNullable(updatedAccountValue)
.map(BonsaiValue::getPrior)
.map(BonsaiAccount::getCodeHash)
.orElse(Hash.EMPTY))
.orElse(null),
null));
pendingCode.setUpdated(updatedAccount.getCode());
}
// This is especially to avoid unnecessary computation for withdrawals
@@ -331,10 +353,10 @@ public class BonsaiWorldStateUpdater extends AbstractWorldUpdater<BonsaiWorldVie
}
@Override
public Optional<Bytes> getCode(final Address address) {
public Optional<Bytes> getCode(final Address address, final Hash codeHash) {
final BonsaiValue<Bytes> localCode = codeToUpdate.get(address);
if (localCode == null) {
return wrappedWorldView().getCode(address);
return wrappedWorldView().getCode(address, codeHash);
} else {
return Optional.ofNullable(localCode.getUpdated());
}
@@ -366,29 +388,35 @@ public class BonsaiWorldStateUpdater extends AbstractWorldUpdater<BonsaiWorldVie
if (emptySlot.contains(slot)) {
return Optional.empty();
} else {
final Optional<UInt256> valueUInt =
(wrappedWorldView() instanceof BonsaiPersistedWorldState)
? ((BonsaiPersistedWorldState) wrappedWorldView())
.getStorageValueBySlotHash(
() ->
Optional.ofNullable(loadAccount(address, BonsaiValue::getPrior))
.map(BonsaiAccount::getStorageRoot),
address,
slotHash)
: wrappedWorldView().getStorageValueBySlotHash(address, slotHash);
valueUInt.ifPresentOrElse(
v ->
storageToUpdate
.computeIfAbsent(
address,
key ->
new StorageConsumingMap<>(
address, new ConcurrentHashMap<>(), storagePreloader))
.put(slotHash, new BonsaiValue<>(v, v)),
() -> {
emptySlot.add(Bytes.concatenate(Hash.hash(address), slotHash));
});
return valueUInt;
try {
final Optional<UInt256> valueUInt =
(wrappedWorldView() instanceof BonsaiPersistedWorldState)
? ((BonsaiPersistedWorldState) wrappedWorldView())
.getStorageValueBySlotHash(
() ->
Optional.ofNullable(loadAccount(address, BonsaiValue::getPrior))
.map(BonsaiAccount::getStorageRoot),
address,
slotHash)
: wrappedWorldView().getStorageValueBySlotHash(address, slotHash);
valueUInt.ifPresentOrElse(
v ->
storageToUpdate
.computeIfAbsent(
address,
key ->
new StorageConsumingMap<>(
address, new ConcurrentHashMap<>(), storagePreloader))
.put(slotHash, new BonsaiValue<>(v, v)),
() -> {
emptySlot.add(Bytes.concatenate(Hash.hash(address), slotHash));
});
return valueUInt;
} catch (MerkleTrieException e) {
// need to throw to trigger the heal
throw new MerkleTrieException(
e.getMessage(), Optional.of(address), e.getHash(), e.getLocation());
}
}
}
@@ -580,15 +608,21 @@ public class BonsaiWorldStateUpdater extends AbstractWorldUpdater<BonsaiWorldVie
private BonsaiValue<BonsaiAccount> loadAccountFromParent(
final Address address, final BonsaiValue<BonsaiAccount> defaultValue) {
final Account parentAccount = wrappedWorldView().get(address);
if (parentAccount instanceof BonsaiAccount) {
final BonsaiAccount account = (BonsaiAccount) parentAccount;
final BonsaiValue<BonsaiAccount> loadedAccountValue =
new BonsaiValue<>(new BonsaiAccount(account), account);
accountsToUpdate.put(address, loadedAccountValue);
return loadedAccountValue;
} else {
return defaultValue;
try {
final Account parentAccount = wrappedWorldView().get(address);
if (parentAccount instanceof BonsaiAccount) {
final BonsaiAccount account = (BonsaiAccount) parentAccount;
final BonsaiValue<BonsaiAccount> loadedAccountValue =
new BonsaiValue<>(new BonsaiAccount(account), account);
accountsToUpdate.put(address, loadedAccountValue);
return loadedAccountValue;
} else {
return defaultValue;
}
} catch (MerkleTrieException e) {
// need to throw to trigger the heal
throw new MerkleTrieException(
e.getMessage(), Optional.of(address), e.getHash(), e.getLocation());
}
}
@@ -600,7 +634,11 @@ public class BonsaiWorldStateUpdater extends AbstractWorldUpdater<BonsaiWorldVie
}
BonsaiValue<Bytes> codeValue = codeToUpdate.get(address);
if (codeValue == null) {
final Bytes storedCode = wrappedWorldView().getCode(address).orElse(Bytes.EMPTY);
final Bytes storedCode =
wrappedWorldView()
.getCode(
address, Optional.ofNullable(expectedCode).map(Hash::hash).orElse(Hash.EMPTY))
.orElse(Bytes.EMPTY);
if (!storedCode.isEmpty()) {
codeValue = new BonsaiValue<>(storedCode, storedCode);
codeToUpdate.put(address, codeValue);

View File

@@ -30,7 +30,7 @@ import org.apache.tuweni.units.bigints.UInt256;
public interface BonsaiWorldView extends WorldView {
Optional<Bytes> getCode(Address address);
Optional<Bytes> getCode(Address address, final Hash codeHash);
Optional<Bytes> getStateTrieNode(Bytes location);

View File

@@ -105,6 +105,11 @@ public class SnapshotTrieLogManager extends AbstractTrieLogManager<BonsaiSnapsho
dropArchive();
}
@Override
public void onClearTrieLog() {
dropArchive();
}
private void dropArchive() {
// drop all cached snapshot worldstates, they are unsafe when the db has been truncated
LOG.info("Key-value storage truncated, dropping cached worldstates");

View File

@@ -14,12 +14,15 @@
*/
package org.hyperledger.besu.ethereum.core;
import org.hyperledger.besu.plugin.data.Address;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.tuweni.bytes.Bytes;
/** Provides an interface to block synchronization processes. */
public interface Synchronizer {
@@ -42,6 +45,8 @@ public interface Synchronizer {
boolean resyncWorldState();
boolean healWorldState(final Optional<Address> maybeAccountToRepair, final Bytes location);
long subscribeSyncStatus(final BesuEvents.SyncStatusListener listener);
boolean unsubscribeSyncStatus(long observerId);

View File

@@ -28,6 +28,7 @@ import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.Withdrawal;
import org.hyperledger.besu.ethereum.privacy.storage.PrivateMetadataUpdater;
import org.hyperledger.besu.ethereum.processing.TransactionProcessingResult;
import org.hyperledger.besu.ethereum.trie.MerkleTrieException;
import org.hyperledger.besu.ethereum.vm.BlockHashLookup;
import org.hyperledger.besu.evm.tracing.OperationTracer;
import org.hyperledger.besu.evm.worldstate.WorldState;
@@ -162,6 +163,12 @@ public abstract class AbstractBlockProcessor implements BlockProcessor {
try {
worldState.persist(blockHeader);
} catch (MerkleTrieException e) {
LOG.trace("Merkle trie exception during Transaction processing ", e);
if (worldState instanceof BonsaiPersistedWorldState) {
((BonsaiWorldStateUpdater) worldState.updater()).reset();
}
throw e;
} catch (Exception e) {
LOG.error("failed persisting block", e);
return new BlockProcessingResult(Optional.empty(), e);

View File

@@ -29,6 +29,7 @@ import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.ethereum.privacy.storage.PrivateMetadataUpdater;
import org.hyperledger.besu.ethereum.processing.TransactionProcessingResult;
import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason;
import org.hyperledger.besu.ethereum.trie.MerkleTrieException;
import org.hyperledger.besu.ethereum.vm.BlockHashLookup;
import org.hyperledger.besu.ethereum.worldstate.GoQuorumMutablePrivateWorldStateUpdater;
import org.hyperledger.besu.evm.AccessListEntry;
@@ -473,6 +474,9 @@ public class MainnetTransactionProcessor {
return TransactionProcessingResult.failed(
gasUsedByTransaction, refunded, validationResult, initialFrame.getRevertReason());
}
} catch (final MerkleTrieException re) {
// need to throw to trigger the heal
throw re;
} catch (final RuntimeException re) {
LOG.error("Critical Exception Processing Transaction", re);
return TransactionProcessingResult.invalid(

View File

@@ -97,6 +97,11 @@ public class WorldStateKeyValueStorage implements WorldStateStorage {
keyValueStorage.clear();
}
@Override
public void clearTrieLog() {
// nothing to do for forest
}
@Override
public void clearFlatDatabase() {
// nothing to do for forest

View File

@@ -44,6 +44,8 @@ public interface WorldStateStorage {
void clear();
void clearTrieLog();
void clearFlatDatabase();
Updater updater();

View File

@@ -18,11 +18,9 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage.WORLD_ROOT_HASH_KEY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
@@ -32,10 +30,8 @@ import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StorageEntriesCollector;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.tuweni.bytes.Bytes;
@@ -48,7 +44,7 @@ public class BonsaiWorldStateKeyValueStorageTest {
@Test
public void getCode_returnsEmpty() {
final BonsaiWorldStateKeyValueStorage storage = emptyStorage();
assertThat(storage.getCode(null, Hash.EMPTY)).isEmpty();
assertThat(storage.getCode(Hash.EMPTY, Hash.EMPTY)).contains(Bytes.EMPTY);
}
@Test
@@ -89,7 +85,8 @@ public class BonsaiWorldStateKeyValueStorageTest {
.putCode(Hash.EMPTY, Bytes.EMPTY)
.commit();
assertThat(storage.getCode(null, Hash.EMPTY)).contains(MerklePatriciaTrie.EMPTY_TRIE_NODE);
assertThat(storage.getCode(Hash.hash(MerklePatriciaTrie.EMPTY_TRIE_NODE), Hash.EMPTY))
.contains(MerklePatriciaTrie.EMPTY_TRIE_NODE);
}
@Test
@@ -98,7 +95,7 @@ public class BonsaiWorldStateKeyValueStorageTest {
final BonsaiWorldStateKeyValueStorage storage = emptyStorage();
storage.updater().putCode(Hash.EMPTY, bytes).commit();
assertThat(storage.getCode(null, Hash.EMPTY)).contains(bytes);
assertThat(storage.getCode(Hash.hash(bytes), Hash.EMPTY)).contains(bytes);
}
@Test
@@ -255,9 +252,8 @@ public class BonsaiWorldStateKeyValueStorageTest {
updaterA.commit();
updaterB.commit();
assertThat(storage.getCode(null, accountHashA)).contains(bytesA);
assertThat(storage.getCode(null, accountHashB)).contains(bytesB);
assertThat(storage.getCode(null, accountHashD)).contains(bytesC);
assertThat(storage.getCode(Hash.hash(bytesB), accountHashB)).contains(bytesB);
assertThat(storage.getCode(Hash.hash(bytesC), accountHashD)).contains(bytesC);
}
@Test
@@ -295,62 +291,6 @@ public class BonsaiWorldStateKeyValueStorageTest {
assertThat(storage.isWorldStateAvailable(Bytes32.wrap(nodeHashKey), Hash.EMPTY)).isTrue();
}
@Test
public void getAccountStateTrieNode_callFallbackMechanismForInvalidNode() {
PeerTrieNodeFinder peerTrieNodeFinder = mock(PeerTrieNodeFinder.class);
final Bytes location = Bytes.fromHexString("0x01");
final Bytes bytesInDB = Bytes.fromHexString("0x123456");
final Hash hashToFind = Hash.hash(Bytes.of(1));
final Bytes bytesToFind = Bytes.fromHexString("0x123457");
final BonsaiWorldStateKeyValueStorage storage = emptyStorage();
when(peerTrieNodeFinder.getAccountStateTrieNode(location, hashToFind))
.thenReturn(Optional.of(bytesToFind));
storage.useFallbackNodeFinder(Optional.of(peerTrieNodeFinder));
storage.updater().putAccountStateTrieNode(location, Hash.hash(bytesInDB), bytesInDB).commit();
Optional<Bytes> accountStateTrieNodeResult =
storage.getAccountStateTrieNode(location, hashToFind);
verify(peerTrieNodeFinder).getAccountStateTrieNode(location, hashToFind);
assertThat(accountStateTrieNodeResult).contains(bytesToFind);
}
@Test
public void getAccountStorageTrieNode_callFallbackMechanismForInvalidNode() {
PeerTrieNodeFinder peerTrieNodeFinder = mock(PeerTrieNodeFinder.class);
final Hash account = Hash.hash(Bytes32.ZERO);
final Bytes location = Bytes.fromHexString("0x01");
final Bytes bytesInDB = Bytes.fromHexString("0x123456");
final Hash hashToFind = Hash.hash(Bytes.of(1));
final Bytes bytesToFind = Bytes.fromHexString("0x123457");
final BonsaiWorldStateKeyValueStorage storage = emptyStorage();
when(peerTrieNodeFinder.getAccountStorageTrieNode(account, location, hashToFind))
.thenReturn(Optional.of(bytesToFind));
storage.useFallbackNodeFinder(Optional.of(peerTrieNodeFinder));
storage
.updater()
.putAccountStorageTrieNode(account, location, Hash.hash(bytesInDB), bytesInDB)
.commit();
Optional<Bytes> accountStateTrieNodeResult =
storage.getAccountStorageTrieNode(account, location, hashToFind);
verify(peerTrieNodeFinder).getAccountStorageTrieNode(account, location, hashToFind);
assertThat(accountStateTrieNodeResult).contains(bytesToFind);
}
private BonsaiWorldStateKeyValueStorage emptyStorage() {
return new BonsaiWorldStateKeyValueStorage(new InMemoryKeyValueStorageProvider());
}

View File

@@ -15,12 +15,12 @@
package org.hyperledger.besu.ethereum.eth.sync;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda;
import org.hyperledger.besu.consensus.merge.ForkchoiceEvent;
import org.hyperledger.besu.consensus.merge.UnverifiedForkchoiceListener;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateArchive;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.checkpointsync.CheckpointDownloaderFactory;
@@ -33,25 +33,27 @@ import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapPersistedContext;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStatePeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.ethereum.worldstate.Pruner;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.data.Address;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.log.FramedLogMessage;
import java.nio.file.Path;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,14 +65,10 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
private final SyncState syncState;
private final AtomicBoolean running = new AtomicBoolean(false);
private final Optional<BlockPropagationManager> blockPropagationManager;
private final Function<Boolean, Optional<FastSyncDownloader<?>>> fastSyncFactory;
private final Supplier<Optional<FastSyncDownloader<?>>> fastSyncFactory;
private Optional<FastSyncDownloader<?>> fastSyncDownloader;
private final Optional<FullSyncDownloader> fullSyncDownloader;
private final EthContext ethContext;
private final ProtocolContext protocolContext;
private final ProtocolManager protocolManager;
private final WorldStateStorage worldStateStorage;
private final MetricsSystem metricsSystem;
private final PivotBlockSelector pivotBlockSelector;
private final SyncTerminationCondition terminationCondition;
@@ -88,16 +86,11 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
final Clock clock,
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition,
final ProtocolManager protocolManager,
final PivotBlockSelector pivotBlockSelector) {
this.maybePruner = maybePruner;
this.syncState = syncState;
this.protocolManager = protocolManager;
this.pivotBlockSelector = pivotBlockSelector;
this.ethContext = ethContext;
this.protocolContext = protocolContext;
this.worldStateStorage = worldStateStorage;
this.metricsSystem = metricsSystem;
this.terminationCondition = terminationCondition;
ChainHeadTracker.trackChainHeadForPeers(
@@ -136,7 +129,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
if (SyncMode.FAST.equals(syncConfig.getSyncMode())) {
this.fastSyncFactory =
(isResync) ->
() ->
FastDownloaderFactory.create(
pivotBlockSelector,
syncConfig,
@@ -147,11 +140,10 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
ethContext,
worldStateStorage,
syncState,
clock,
isResync);
clock);
} else if (SyncMode.X_CHECKPOINT.equals(syncConfig.getSyncMode())) {
this.fastSyncFactory =
(isResync) ->
() ->
CheckpointDownloaderFactory.createCheckpointDownloader(
new SnapPersistedContext(storageProvider),
pivotBlockSelector,
@@ -163,11 +155,10 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
ethContext,
worldStateStorage,
syncState,
clock,
isResync);
clock);
} else {
this.fastSyncFactory =
(isResync) ->
() ->
SnapDownloaderFactory.createSnapDownloader(
new SnapPersistedContext(storageProvider),
pivotBlockSelector,
@@ -179,12 +170,11 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
ethContext,
worldStateStorage,
syncState,
clock,
isResync);
clock);
}
// create a non-resync fast sync downloader:
this.fastSyncDownloader = this.fastSyncFactory.apply(false);
this.fastSyncDownloader = this.fastSyncFactory.get();
metricsSystem.createLongGauge(
BesuMetricCategory.ETHEREUM,
@@ -222,7 +212,6 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
future = fastSyncDownloader.get().start().thenCompose(this::handleSyncResult);
} else {
syncState.markInitialSyncPhaseAsDone();
enableFallbackNodeFinder();
future = startFullSync();
}
return future.thenApply(this::finalizeSync);
@@ -271,8 +260,6 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
pivotBlockSelector.close();
syncState.markInitialSyncPhaseAsDone();
enableFallbackNodeFinder();
if (terminationCondition.shouldContinueDownload()) {
return startFullSync();
} else {
@@ -281,19 +268,6 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
}
}
private void enableFallbackNodeFinder() {
if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) {
final Optional<PeerTrieNodeFinder> fallbackNodeFinder =
Optional.of(
new WorldStatePeerTrieNodeFinder(
ethContext, protocolManager, protocolContext.getBlockchain(), metricsSystem));
((BonsaiWorldStateArchive) protocolContext.getWorldStateArchive())
.useFallbackNodeFinder(fallbackNodeFinder);
((BonsaiWorldStateKeyValueStorage) worldStateStorage)
.useFallbackNodeFinder(fallbackNodeFinder);
}
}
private CompletableFuture<Void> startFullSync() {
maybePruner.ifPresent(Pruner::start);
return fullSyncDownloader
@@ -322,10 +296,41 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
stop();
fastSyncDownloader.get().deleteFastSyncState();
}
// recreate fast sync with resync and start
this.syncState.markInitialSyncRestart();
this.fastSyncDownloader = this.fastSyncFactory.apply(true);
this.syncState.markResyncNeeded();
this.fastSyncDownloader = this.fastSyncFactory.get();
start();
return true;
}
@Override
public boolean healWorldState(
final Optional<Address> maybeAccountToRepair, final Bytes location) {
// recreate fast sync with resync and start
if (fastSyncDownloader.isPresent() && running.get()) {
stop();
fastSyncDownloader.get().deleteFastSyncState();
}
final List<String> lines = new ArrayList<>();
lines.add("Besu has identified a problem with its worldstate database.");
lines.add("Your node will fetch the correct data from peers to repair the problem.");
lines.add("Starting the sync pipeline...");
infoLambda(LOG, FramedLogMessage.generate(lines));
this.syncState.markInitialSyncRestart();
this.syncState.markResyncNeeded();
maybeAccountToRepair.ifPresent(
address -> {
if (this.protocolContext.getWorldStateArchive() instanceof BonsaiWorldStateArchive) {
((BonsaiWorldStateArchive) this.protocolContext.getWorldStateArchive())
.prepareStateHealing(
org.hyperledger.besu.datatypes.Address.wrap(address), location);
}
this.syncState.markAccountToRepair(maybeAccountToRepair);
});
this.fastSyncDownloader = this.fastSyncFactory.get();
start();
return true;
}

View File

@@ -157,6 +157,7 @@ public class PipelineChainDownloader implements ChainDownloader {
if (!syncTargetManager.shouldContinueDownloading()) {
return CompletableFuture.completedFuture(null);
}
syncState.setSyncTarget(target.peer(), target.commonAncestor());
debugLambda(
LOG,

View File

@@ -59,10 +59,12 @@ public class BackwardSyncStep {
@VisibleForTesting
protected CompletableFuture<List<BlockHeader>> requestHeaders(final Hash hash) {
if (context.getProtocolContext().getBlockchain().contains(hash)) {
final Optional<BlockHeader> blockHeader =
context.getProtocolContext().getBlockchain().getBlockHeader(hash);
if (blockHeader.isPresent()) {
LOG.debug(
"Hash {} already present in local blockchain no need to request headers to peers", hash);
return CompletableFuture.completedFuture(List.of());
return CompletableFuture.completedFuture(List.of(blockHeader.get()));
}
final int batchSize = context.getBatchSize();
@@ -104,8 +106,9 @@ public class BackwardSyncStep {
saveHeader(blockHeader);
}
logProgress(blockHeaders.get(blockHeaders.size() - 1).getNumber());
if (!blockHeaders.isEmpty()) {
logProgress(blockHeaders.get(blockHeaders.size() - 1).getNumber());
}
return null;
}

View File

@@ -84,11 +84,13 @@ public class BackwardsSyncAlgorithm implements BesuEvents.InitialSyncCompletionL
final MutableBlockchain blockchain = context.getProtocolContext().getBlockchain();
final BlockHeader firstAncestorHeader = maybeFirstAncestorHeader.get();
if (blockchain.contains(firstAncestorHeader.getHash())) {
final BlockHeader chainHeader = blockchain.getChainHeadHeader();
if (blockchain.contains(firstAncestorHeader.getHash())
&& firstAncestorHeader.getNumber() <= chainHeader.getNumber()) {
return executeProcessKnownAncestors();
}
if (blockchain.getChainHead().getHeight() > firstAncestorHeader.getNumber()) {
if (chainHeader.getNumber() > firstAncestorHeader.getNumber()) {
debugLambda(
LOG,
"Backward reached below current chain head {} : {}",

View File

@@ -44,7 +44,9 @@ public interface FinalBlockConfirmation {
}
static FinalBlockConfirmation ancestorConfirmation(final MutableBlockchain blockchain) {
return firstHeader -> blockchain.contains(firstHeader.getParentHash());
return firstHeader ->
blockchain.contains(firstHeader.getParentHash())
&& blockchain.getChainHeadBlockNumber() + 1 >= firstHeader.getNumber();
}
static FinalBlockConfirmation confirmationChain(final FinalBlockConfirmation... confirmations) {

View File

@@ -105,11 +105,11 @@ public class ForwardSyncStep {
}
for (Block block : blocks) {
final Optional<Block> parent =
final Optional<BlockHeader> parent =
context
.getProtocolContext()
.getBlockchain()
.getBlockByHash(block.getHeader().getParentHash());
.getBlockHeader(block.getHeader().getParentHash());
if (parent.isEmpty()) {
context.halveBatchSize();

View File

@@ -20,8 +20,10 @@ package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.slf4j.LoggerFactory.getLogger;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import com.google.common.annotations.VisibleForTesting;
@@ -47,17 +49,33 @@ public class ProcessKnownAncestorsStep {
protected void processKnownAncestors() {
while (backwardChain.getFirstAncestorHeader().isPresent()) {
BlockHeader header = backwardChain.getFirstAncestorHeader().orElseThrow();
if (context.getProtocolContext().getBlockchain().contains(header.getHash())) {
final long chainHeadBlockNumber =
context.getProtocolContext().getBlockchain().getChainHeadBlockNumber();
boolean isFirstUnProcessedHeader = true;
if (context.getProtocolContext().getBlockchain().contains(header.getHash())
&& header.getNumber() <= chainHeadBlockNumber) {
debugLambda(
LOG,
"Block {} is already imported, we can ignore it for the sync process",
() -> header.toLogString());
header::toLogString);
backwardChain.dropFirstHeader();
} else if (context.getProtocolContext().getBlockchain().contains(header.getParentHash())
&& backwardChain.isTrusted(header.getHash())) {
debugLambda(LOG, "Importing trusted block {}", header::toLogString);
context.saveBlock(backwardChain.getTrustedBlock(header.getHash()));
} else {
isFirstUnProcessedHeader = false;
} else if (context.getProtocolContext().getBlockchain().contains(header.getParentHash())) {
final boolean isTrustedBlock = backwardChain.isTrusted(header.getHash());
final Optional<Block> block =
isTrustedBlock
? Optional.of(backwardChain.getTrustedBlock(header.getHash()))
: context.getProtocolContext().getBlockchain().getBlockByHash(header.getHash());
if (block.isPresent()) {
debugLambda(LOG, "Importing block {}", header::toLogString);
context.saveBlock(block.get());
if (isTrustedBlock) {
backwardChain.dropFirstHeader();
isFirstUnProcessedHeader = false;
}
}
}
if (isFirstUnProcessedHeader) {
debugLambda(LOG, "First unprocessed header is {}", header::toLogString);
return;
}

View File

@@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
@@ -60,8 +61,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final SyncState syncState,
final Clock clock,
final boolean isResync) {
final Clock clock) {
final Path fastSyncDataDirectory = dataDirectory.resolve(FAST_SYNC_FOLDER);
final FastSyncStateStorage fastSyncStateStorage =
@@ -81,13 +81,12 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
final FastSyncState fastSyncState =
fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule));
if (isResync) {
if (syncState.isResyncNeeded()) {
snapContext.clear();
worldStateStorage.clear();
}
if (!isResync
&& fastSyncState.getPivotBlockHeader().isEmpty()
syncState
.getAccountToRepair()
.ifPresent(address -> snapContext.addInconsistentAccount(Hash.hash(address)));
} else if (fastSyncState.getPivotBlockHeader().isEmpty()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()
!= BlockHeader.GENESIS_BLOCK_NUMBER) {
LOG.info(
@@ -109,10 +108,12 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
pivotBlockSelector,
metricsSystem);
} else {
LOG.info(
"Checkpoint sync start with block {} and hash {}",
syncState.getCheckpoint().get().blockNumber(),
syncState.getCheckpoint().get().blockHash());
if (!syncState.isResyncNeeded()) {
LOG.info(
"Checkpoint sync start with block {} and hash {}",
syncState.getCheckpoint().get().blockNumber(),
syncState.getCheckpoint().get().blockHash());
}
fastSyncActions =
new CheckpointSyncActions(
syncConfig,

View File

@@ -85,6 +85,7 @@ public class FastSyncDownloader<REQUEST> {
if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) {
LOG.info("Clearing bonsai flat account db");
worldStateStorage.clearFlatDatabase();
worldStateStorage.clearTrieLog();
}
LOG.debug("Start sync with initial sync state {}", fastSyncState);
return findPivotBlock(fastSyncState, fss -> downloadChainAndWorldState(fastSyncActions, fss));

View File

@@ -167,8 +167,16 @@ public class FastSyncTargetManager extends SyncTargetManager {
@Override
public boolean shouldContinueDownloading() {
final BlockHeader pivotBlockHeader = fastSyncState.getPivotBlockHeader().get();
return !protocolContext.getBlockchain().getChainHeadHash().equals(pivotBlockHeader.getHash())
|| !worldStateStorage.isWorldStateAvailable(
pivotBlockHeader.getStateRoot(), pivotBlockHeader.getBlockHash());
boolean isValidChainHead =
protocolContext.getBlockchain().getChainHeadHash().equals(pivotBlockHeader.getHash());
if (!isValidChainHead) {
if (protocolContext.getBlockchain().contains(pivotBlockHeader.getHash())) {
protocolContext.getBlockchain().rewindToBlock(pivotBlockHeader.getHash());
} else {
return true;
}
}
return !worldStateStorage.isWorldStateAvailable(
pivotBlockHeader.getStateRoot(), pivotBlockHeader.getBlockHash());
}
}

View File

@@ -61,8 +61,7 @@ public class FastDownloaderFactory {
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final SyncState syncState,
final Clock clock,
final boolean isResync) {
final Clock clock) {
final Path fastSyncDataDirectory = dataDirectory.resolve(FAST_SYNC_FOLDER);
final FastSyncStateStorage fastSyncStateStorage =
@@ -82,9 +81,8 @@ public class FastDownloaderFactory {
final FastSyncState fastSyncState =
fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule));
if (isResync) {
worldStateStorage.clear();
} else if (fastSyncState.getPivotBlockHeader().isEmpty()
if (!syncState.isResyncNeeded()
&& fastSyncState.getPivotBlockHeader().isEmpty()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()
!= BlockHeader.GENESIS_BLOCK_NUMBER) {
LOG.info(

View File

@@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
@@ -56,8 +57,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final SyncState syncState,
final Clock clock,
final boolean isResync) {
final Clock clock) {
final Path fastSyncDataDirectory = dataDirectory.resolve(FAST_SYNC_FOLDER);
final FastSyncStateStorage fastSyncStateStorage =
@@ -77,13 +77,12 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
final FastSyncState fastSyncState =
fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule));
if (isResync) {
if (syncState.isResyncNeeded()) {
snapContext.clear();
worldStateStorage.clear();
}
if (!isResync
&& fastSyncState.getPivotBlockHeader().isEmpty()
syncState
.getAccountToRepair()
.ifPresent(address -> snapContext.addInconsistentAccount(Hash.hash(address)));
} else if (fastSyncState.getPivotBlockHeader().isEmpty()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()
!= BlockHeader.GENESIS_BLOCK_NUMBER) {
LOG.info(

View File

@@ -41,7 +41,7 @@ public class SnapPersistedContext {
private final GenericKeyValueStorageFacade<BigInteger, AccountRangeDataRequest>
accountRangeToDownload;
private final GenericKeyValueStorageFacade<BigInteger, Bytes> inconsistentAccounts;
private final GenericKeyValueStorageFacade<BigInteger, Bytes> healContext;
public SnapPersistedContext(final StorageProvider storageProvider) {
this.accountRangeToDownload =
@@ -61,7 +61,7 @@ public class SnapPersistedContext {
},
storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.SNAPSYNC_MISSING_ACCOUNT_RANGE));
this.inconsistentAccounts =
this.healContext =
new GenericKeyValueStorageFacade<>(
BigInteger::toByteArray,
new ValueConvertor<>() {
@@ -95,25 +95,25 @@ public class SnapPersistedContext {
public void addInconsistentAccount(final Bytes inconsistentAccount) {
final BigInteger index =
inconsistentAccounts
healContext
.get(SNAP_INCONSISTENT_ACCOUNT_INDEX)
.map(bytes -> new BigInteger(bytes.toArrayUnsafe()).add(BigInteger.ONE))
.orElse(BigInteger.ZERO);
inconsistentAccounts.putAll(
healContext.putAll(
keyValueStorageTransaction -> {
keyValueStorageTransaction.put(SNAP_INCONSISTENT_ACCOUNT_INDEX, index.toByteArray());
keyValueStorageTransaction.put(index.toByteArray(), inconsistentAccount.toArrayUnsafe());
});
}
public List<AccountRangeDataRequest> getPersistedTasks() {
public List<AccountRangeDataRequest> getCurrentAccountRange() {
return accountRangeToDownload
.streamValuesFromKeysThat(bytes -> true)
.collect(Collectors.toList());
}
public HashSet<Bytes> getInconsistentAccounts() {
return inconsistentAccounts
return healContext
.streamValuesFromKeysThat(notEqualsTo(SNAP_INCONSISTENT_ACCOUNT_INDEX))
.collect(Collectors.toCollection(HashSet::new));
}
@@ -124,12 +124,12 @@ public class SnapPersistedContext {
public void clear() {
accountRangeToDownload.clear();
inconsistentAccounts.clear();
healContext.clear();
}
public void close() throws IOException {
accountRangeToDownload.close();
inconsistentAccounts.close();
healContext.close();
}
private Predicate<byte[]> notEqualsTo(final byte[] name) {

View File

@@ -198,6 +198,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
public synchronized void reloadHeal() {
worldStateStorage.clearFlatDatabase();
worldStateStorage.clearTrieLog();
pendingTrieNodeRequests.clear();
pendingCodeRequests.clear();
snapSyncState.setHealStatus(false);

View File

@@ -147,21 +147,24 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
final Map<Bytes32, Bytes32> ranges = RangeManager.generateAllRanges(16);
snapsyncMetricsManager.initRange(ranges);
final List<AccountRangeDataRequest> persistedTasks = snapContext.getPersistedTasks();
final List<AccountRangeDataRequest> currentAccountRange =
snapContext.getCurrentAccountRange();
final HashSet<Bytes> inconsistentAccounts = snapContext.getInconsistentAccounts();
if (!persistedTasks.isEmpty()) { // continue to download worldstate ranges
if (!currentAccountRange.isEmpty()) { // continue to download worldstate ranges
newDownloadState.setInconsistentAccounts(inconsistentAccounts);
snapContext
.getPersistedTasks()
.getCurrentAccountRange()
.forEach(
snapDataRequest -> {
snapsyncMetricsManager.notifyStateDownloaded(
snapDataRequest.getStartKeyHash(), snapDataRequest.getEndKeyHash());
newDownloadState.enqueueRequest(snapDataRequest);
});
} else if (!inconsistentAccounts.isEmpty()) { // restart only the heal step
} else if (!snapContext.getInconsistentAccounts().isEmpty()) { // restart only the heal step
snapSyncState.setHealStatus(true);
worldStateStorage.clearFlatDatabase();
worldStateStorage.clearTrieLog();
newDownloadState.setInconsistentAccounts(inconsistentAccounts);
newDownloadState.enqueueRequest(
SnapDataRequest.createAccountTrieNodeDataRequest(

View File

@@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloadStatus;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.data.Address;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents.InitialSyncCompletionListener;
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
@@ -58,6 +59,10 @@ public class SyncState {
private final Optional<Checkpoint> checkpoint;
private volatile boolean isInitialSyncPhaseDone;
private volatile boolean isResyncNeeded;
private Optional<Address> maybeAccountToRepair;
public SyncState(final Blockchain blockchain, final EthPeers ethPeers) {
this(blockchain, ethPeers, false, Optional.empty());
}
@@ -311,13 +316,30 @@ public class SyncState {
return checkpoint;
}
public boolean isInitialSyncPhaseDone() {
return isInitialSyncPhaseDone;
}
public void markInitialSyncPhaseAsDone() {
isInitialSyncPhaseDone = true;
isResyncNeeded = false;
completionListenerSubscribers.forEach(InitialSyncCompletionListener::onInitialSyncCompleted);
}
public boolean isInitialSyncPhaseDone() {
return isInitialSyncPhaseDone;
public boolean isResyncNeeded() {
return isResyncNeeded;
}
public void markResyncNeeded() {
isResyncNeeded = true;
}
public Optional<Address> getAccountToRepair() {
return maybeAccountToRepair;
}
public void markAccountToRepair(final Optional<Address> address) {
maybeAccountToRepair = address;
}
public void markInitialSyncRestart() {

View File

@@ -1,172 +0,0 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.worldstate;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.EthProtocolVersion;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.snap.RetryingGetTrieNodeFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetNodeDataFromPeerTask;
import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager;
import org.hyperledger.besu.ethereum.trie.CompactEncoding;
import org.hyperledger.besu.ethereum.worldstate.PeerTrieNodeFinder;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** This class is used to retrieve missing nodes in the trie by querying the peers */
public class WorldStatePeerTrieNodeFinder implements PeerTrieNodeFinder {
private static final Logger LOG = LoggerFactory.getLogger(WorldStatePeerTrieNodeFinder.class);
private final Cache<Bytes32, Bytes> foundNodes =
CacheBuilder.newBuilder().maximumSize(10_000).expireAfterWrite(5, TimeUnit.MINUTES).build();
private static final long TIMEOUT_SECONDS = 1;
final ProtocolManager protocolManager;
final EthContext ethContext;
final Blockchain blockchain;
final MetricsSystem metricsSystem;
public WorldStatePeerTrieNodeFinder(
final EthContext ethContext,
final ProtocolManager protocolManager,
final Blockchain blockchain,
final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.protocolManager = protocolManager;
this.blockchain = blockchain;
this.metricsSystem = metricsSystem;
}
@Override
public Optional<Bytes> getAccountStateTrieNode(final Bytes location, final Bytes32 nodeHash) {
Optional<Bytes> cachedValue = Optional.ofNullable(foundNodes.getIfPresent(nodeHash));
if (cachedValue.isPresent()) {
return cachedValue;
}
final Optional<Bytes> response =
findByGetNodeData(Hash.wrap(nodeHash))
.or(() -> findByGetTrieNodeData(Hash.wrap(nodeHash), Optional.empty(), location));
response.ifPresent(
bytes -> {
LOG.debug(
"Fixed missing account state trie node for location {} and hash {}",
location,
nodeHash);
foundNodes.put(nodeHash, bytes);
});
return response;
}
@Override
public Optional<Bytes> getAccountStorageTrieNode(
final Hash accountHash, final Bytes location, final Bytes32 nodeHash) {
Optional<Bytes> cachedValue = Optional.ofNullable(foundNodes.getIfPresent(nodeHash));
if (cachedValue.isPresent()) {
return cachedValue;
}
final Optional<Bytes> response =
// Call findByGetNodeData only if protocol version < eth 67
(protocolManager.getHighestProtocolVersion() < EthProtocolVersion.V67
? findByGetNodeData(Hash.wrap(nodeHash))
: Optional.<Bytes>empty())
.or(
() ->
findByGetTrieNodeData(Hash.wrap(nodeHash), Optional.of(accountHash), location));
response.ifPresent(
bytes -> {
LOG.debug(
"Fixed missing storage state trie node for location {} and hash {}",
location,
nodeHash);
foundNodes.put(nodeHash, bytes);
});
return response;
}
@VisibleForTesting
public Optional<Bytes> findByGetNodeData(final Hash nodeHash) {
if (protocolManager.getHighestProtocolVersion() >= EthProtocolVersion.V67) {
return Optional.empty();
}
final BlockHeader chainHead = blockchain.getChainHeadHeader();
final RetryingGetNodeDataFromPeerTask retryingGetNodeDataFromPeerTask =
RetryingGetNodeDataFromPeerTask.forHashes(
ethContext, List.of(nodeHash), chainHead.getNumber(), metricsSystem);
try {
final Map<Hash, Bytes> response =
retryingGetNodeDataFromPeerTask.run().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (response.containsKey(nodeHash)) {
LOG.debug("Found node {} with getNodeData request", nodeHash);
return Optional.of(response.get(nodeHash));
} else {
LOG.debug("Found invalid node {} with getNodeData request", nodeHash);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.debug("Error when trying to find node {} with getNodeData request", nodeHash);
}
return Optional.empty();
}
@VisibleForTesting
public Optional<Bytes> findByGetTrieNodeData(
final Hash nodeHash, final Optional<Bytes32> accountHash, final Bytes location) {
final BlockHeader chainHead = blockchain.getChainHeadHeader();
final Map<Bytes, List<Bytes>> request = new HashMap<>();
if (accountHash.isPresent()) {
request.put(accountHash.get(), List.of(CompactEncoding.encode(location)));
} else {
request.put(CompactEncoding.encode(location), new ArrayList<>());
}
final Bytes path = CompactEncoding.encode(location);
final EthTask<Map<Bytes, Bytes>> getTrieNodeFromPeerTask =
RetryingGetTrieNodeFromPeerTask.forTrieNodes(ethContext, request, chainHead, metricsSystem);
try {
final Map<Bytes, Bytes> response =
getTrieNodeFromPeerTask.run().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
final Bytes nodeValue =
response.get(Bytes.concatenate(accountHash.map(Bytes::wrap).orElse(Bytes.EMPTY), path));
if (nodeValue != null && Hash.hash(nodeValue).equals(nodeHash)) {
LOG.debug("Found node {} with getTrieNode request", nodeHash);
return Optional.of(nodeValue);
} else {
LOG.debug("Found invalid node {} with getTrieNode request", nodeHash);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.debug("Error when trying to find node {} with getTrieNode request", nodeHash);
}
return Optional.empty();
}
}

View File

@@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -66,6 +67,7 @@ public class BackwardSyncStepTest {
private final DeterministicEthScheduler ethScheduler = new DeterministicEthScheduler();
private MutableBlockchain localBlockchain;
private MutableBlockchain remoteBlockchain;
private RespondingEthPeer peer;
GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage;
@@ -91,7 +93,7 @@ public class BackwardSyncStepTest {
Block genesisBlock = blockDataGenerator.genesisBlock();
remoteBlockchain = createInMemoryBlockchain(genesisBlock);
MutableBlockchain localBlockchain = createInMemoryBlockchain(genesisBlock);
localBlockchain = spy(createInMemoryBlockchain(genesisBlock));
for (int i = 1; i <= REMOTE_HEIGHT; i++) {
final BlockDataGenerator.BlockOptions options =
@@ -171,7 +173,10 @@ public class BackwardSyncStepTest {
final CompletableFuture<List<BlockHeader>> future =
step.requestHeaders(lookingForBlock.getHeader().getHash());
assertThat(future.get().isEmpty()).isTrue();
verify(localBlockchain).getBlockHeader(lookingForBlock.getHash());
verify(context, never()).getEthContext();
final BlockHeader blockHeader = future.get().get(0);
assertThat(blockHeader).isEqualTo(lookingForBlock.getHeader());
}
@Test

View File

@@ -80,8 +80,7 @@ public class FastDownloaderFactoryTest {
ethContext,
worldStateStorage,
syncState,
clock,
false))
clock))
.isInstanceOf(IllegalStateException.class);
}
@@ -102,8 +101,7 @@ public class FastDownloaderFactoryTest {
ethContext,
worldStateStorage,
syncState,
clock,
false);
clock);
assertThat(result).isEmpty();
}
@@ -127,8 +125,7 @@ public class FastDownloaderFactoryTest {
ethContext,
worldStateStorage,
syncState,
clock,
false);
clock);
verify(mutableBlockchain).getChainHeadBlockNumber();
}
@@ -158,8 +155,7 @@ public class FastDownloaderFactoryTest {
ethContext,
worldStateStorage,
syncState,
clock,
false);
clock);
verify(worldStateStorage).clear();
assertThat(Files.exists(stateQueueDir)).isFalse();
@@ -191,8 +187,7 @@ public class FastDownloaderFactoryTest {
ethContext,
worldStateStorage,
syncState,
clock,
false))
clock))
.isInstanceOf(IllegalStateException.class);
}

View File

@@ -1,301 +0,0 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.worldstate;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.EthProtocolVersion;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.task.SnapProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.messages.EthPV63;
import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.eth.messages.snap.TrieNodesMessage;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.math.BigInteger;
import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@SuppressWarnings({"FieldCanBeLocal", "unused"})
@RunWith(MockitoJUnitRunner.class)
public class WorldStatePeerTrieNodeFinderTest {
WorldStatePeerTrieNodeFinder worldStatePeerTrieNodeFinder;
private final BlockHeaderTestFixture blockHeaderBuilder = new BlockHeaderTestFixture();
@Mock private Blockchain blockchain;
private EthProtocolManager ethProtocolManager;
private SnapProtocolManager snapProtocolManager;
private EthPeers ethPeers;
@Before
public void setup() throws Exception {
ethProtocolManager = spy(EthProtocolManagerTestUtil.create());
ethPeers = ethProtocolManager.ethContext().getEthPeers();
snapProtocolManager = SnapProtocolManagerTestUtil.create(ethPeers);
worldStatePeerTrieNodeFinder =
spy(
new WorldStatePeerTrieNodeFinder(
ethProtocolManager.ethContext(),
ethProtocolManager,
blockchain,
new NoOpMetricsSystem()));
}
private RespondingEthPeer.Responder respondToGetNodeDataRequest(
final RespondingEthPeer peer, final Bytes32 nodeValue) {
return RespondingEthPeer.targetedResponder(
(cap, msg) -> {
if (msg.getCode() != EthPV63.GET_NODE_DATA) {
return false;
}
return true;
},
(cap, msg) -> NodeDataMessage.create(List.of(nodeValue)));
}
private RespondingEthPeer.Responder respondToGetTrieNodeRequest(
final RespondingEthPeer peer, final Bytes32 nodeValue) {
return RespondingEthPeer.targetedResponder(
(cap, msg) -> {
if (msg.getCode() != SnapV1.GET_TRIE_NODES) {
return false;
}
return true;
},
(cap, msg) -> TrieNodesMessage.create(Optional.of(BigInteger.ONE), List.of(nodeValue)));
}
@Test
public void getAccountStateTrieNodeShouldReturnValueFromGetNodeDataRequest() {
BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
when(ethProtocolManager.getHighestProtocolVersion()).thenReturn(EthProtocolVersion.V66);
final Bytes32 nodeValue = Bytes32.random();
final Bytes32 nodeHash = Hash.hash(nodeValue);
final RespondingEthPeer targetPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, blockHeader.getNumber());
var response =
new Object() {
Optional<Bytes> accountStateTrieNode = Optional.empty();
};
new Thread(
() ->
targetPeer.respondWhileOtherThreadsWork(
respondToGetNodeDataRequest(targetPeer, nodeValue),
() -> response.accountStateTrieNode.isEmpty()))
.start();
response.accountStateTrieNode =
worldStatePeerTrieNodeFinder.getAccountStateTrieNode(Bytes.EMPTY, nodeHash);
Assertions.assertThat(response.accountStateTrieNode).contains(nodeValue);
}
@Test
public void getAccountStateTrieNodeShouldReturnValueFromGetTrieNodeRequest() {
final BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
when(ethProtocolManager.getHighestProtocolVersion()).thenReturn(EthProtocolVersion.V66);
final Bytes32 nodeValue = Bytes32.random();
final Bytes32 nodeHash = Hash.hash(nodeValue);
final RespondingEthPeer targetPeer =
SnapProtocolManagerTestUtil.createPeer(
ethProtocolManager, snapProtocolManager, blockHeader.getNumber());
var response =
new Object() {
Optional<Bytes> accountStateTrieNode = Optional.empty();
};
new Thread(
() ->
targetPeer.respondWhileOtherThreadsWork(
respondToGetTrieNodeRequest(targetPeer, nodeValue),
() -> response.accountStateTrieNode.isEmpty()))
.start();
response.accountStateTrieNode =
worldStatePeerTrieNodeFinder.getAccountStateTrieNode(Bytes.EMPTY, nodeHash);
Assertions.assertThat(response.accountStateTrieNode).contains(nodeValue);
}
@Test
public void getAccountStateTrieNodeShouldReturnEmptyWhenFoundNothing() {
final BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
when(ethProtocolManager.getHighestProtocolVersion()).thenReturn(EthProtocolVersion.V66);
final Bytes32 nodeValue = Bytes32.random();
final Bytes32 nodeHash = Hash.hash(nodeValue);
var response =
new Object() {
Optional<Bytes> accountStateTrieNode = Optional.empty();
};
response.accountStateTrieNode =
worldStatePeerTrieNodeFinder.getAccountStateTrieNode(Bytes.EMPTY, nodeHash);
Assertions.assertThat(response.accountStateTrieNode).isEmpty();
}
@Test
public void getAccountStorageTrieNodeShouldReturnValueFromGetNodeDataRequest() {
BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
when(ethProtocolManager.getHighestProtocolVersion()).thenReturn(EthProtocolVersion.V66);
final Hash accountHash = Hash.wrap(Bytes32.random());
final Bytes32 nodeValue = Bytes32.random();
final Bytes32 nodeHash = Hash.hash(nodeValue);
final RespondingEthPeer targetPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, blockHeader.getNumber());
var response =
new Object() {
Optional<Bytes> accountStateTrieNode = Optional.empty();
};
new Thread(
() ->
targetPeer.respondWhileOtherThreadsWork(
respondToGetNodeDataRequest(targetPeer, nodeValue),
() -> response.accountStateTrieNode.isEmpty()))
.start();
response.accountStateTrieNode =
worldStatePeerTrieNodeFinder.getAccountStorageTrieNode(accountHash, Bytes.EMPTY, nodeHash);
Assertions.assertThat(response.accountStateTrieNode).contains(nodeValue);
}
@Test
public void getAccountStorageTrieNodeShouldReturnValueFromGetTrieNodeRequest() {
final BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
when(ethProtocolManager.getHighestProtocolVersion()).thenReturn(EthProtocolVersion.V66);
final Hash accountHash = Hash.wrap(Bytes32.random());
final Bytes32 nodeValue = Bytes32.random();
final Bytes32 nodeHash = Hash.hash(nodeValue);
final RespondingEthPeer targetPeer =
SnapProtocolManagerTestUtil.createPeer(
ethProtocolManager, snapProtocolManager, blockHeader.getNumber());
var response =
new Object() {
Optional<Bytes> accountStateTrieNode = Optional.empty();
};
new Thread(
() ->
targetPeer.respondWhileOtherThreadsWork(
respondToGetTrieNodeRequest(targetPeer, nodeValue),
() -> response.accountStateTrieNode.isEmpty()))
.start();
response.accountStateTrieNode =
worldStatePeerTrieNodeFinder.getAccountStorageTrieNode(accountHash, Bytes.EMPTY, nodeHash);
Assertions.assertThat(response.accountStateTrieNode).contains(nodeValue);
}
@Test
public void getAccountStorageTrieNodeShouldReturnEmptyWhenFoundNothing() {
final BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
final Hash accountHash = Hash.wrap(Bytes32.random());
final Bytes32 nodeValue = Bytes32.random();
final Bytes32 nodeHash = Hash.hash(nodeValue);
var response =
new Object() {
Optional<Bytes> accountStateTrieNode = Optional.empty();
};
response.accountStateTrieNode =
worldStatePeerTrieNodeFinder.getAccountStorageTrieNode(accountHash, Bytes.EMPTY, nodeHash);
Assertions.assertThat(response.accountStateTrieNode).isEmpty();
}
@Test
public void getNodeDataRequestShouldBeCalled_IfProtocolIsEth66() {
final BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
when(ethProtocolManager.getHighestProtocolVersion()).thenReturn(EthProtocolVersion.V66);
worldStatePeerTrieNodeFinder.getAccountStorageTrieNode(
Hash.wrap(Bytes32.random()), Bytes.EMPTY, Hash.wrap(Bytes32.random()));
// assert findByGetNodeData is called once
verify(worldStatePeerTrieNodeFinder, times(1)).findByGetNodeData(any());
// assert findByGetTrieNodeData is called once
verify(worldStatePeerTrieNodeFinder, times(1)).findByGetTrieNodeData(any(), any(), any());
}
@Test
public void getNodeDataRequestShouldNotBeCalled_IfProtocolIsEth67() {
final BlockHeader blockHeader = blockHeaderBuilder.number(1000).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(blockHeader);
when(ethProtocolManager.getHighestProtocolVersion()).thenReturn(EthProtocolVersion.V67);
worldStatePeerTrieNodeFinder.getAccountStorageTrieNode(
Hash.wrap(Bytes32.random()), Bytes.EMPTY, Hash.wrap(Bytes32.random()));
// assert findByGetNodeData is never called
verify(worldStatePeerTrieNodeFinder, never()).findByGetNodeData(any());
// assert findByGetTrieNodeData is called once
verify(worldStatePeerTrieNodeFinder, times(1)).findByGetTrieNodeData(any(), any(), any());
}
}

View File

@@ -16,12 +16,15 @@
package org.hyperledger.besu.ethereum.retesteth;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.plugin.data.Address;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.tuweni.bytes.Bytes;
/**
* Naive implementation of Synchronizer used by retesteth. Because retesteth is not implemented in
* the test module, it has no access to mockito. This class provides a minimum implementation needed
@@ -49,6 +52,12 @@ public class DummySynchronizer implements Synchronizer {
return false;
}
@Override
public boolean healWorldState(
final Optional<Address> maybeAccountToRepair, final Bytes location) {
return false;
}
@Override
public long subscribeSyncStatus(final BesuEvents.SyncStatusListener listener) {
return 0;

View File

@@ -14,6 +14,10 @@
*/
package org.hyperledger.besu.ethereum.trie;
import org.hyperledger.besu.plugin.data.Address;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
@@ -23,6 +27,7 @@ import org.apache.tuweni.bytes.Bytes32;
*/
public class MerkleTrieException extends RuntimeException {
private Optional<Address> maybeAddress;
private Bytes32 hash;
private Bytes location;
@@ -34,12 +39,28 @@ public class MerkleTrieException extends RuntimeException {
super(message);
this.hash = hash;
this.location = location;
this.maybeAddress = Optional.empty();
}
public MerkleTrieException(final String message, final Exception cause) {
super(message, cause);
}
public MerkleTrieException(
final String message,
final Optional<Address> maybeAddress,
final Bytes32 hash,
final Bytes location) {
super(message);
this.hash = hash;
this.location = location;
this.maybeAddress = maybeAddress;
}
public Optional<Address> getMaybeAddress() {
return maybeAddress;
}
public Bytes32 getHash() {
return hash;
}

View File

@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.DoubleSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableSet;
@@ -179,10 +180,28 @@ public class PrometheusMetricsSystem implements ObservableMetricsSystem {
}
private void addCollectorUnchecked(final MetricCategory category, final Collector metric) {
metric.register(registry);
collectors
.computeIfAbsent(category, key -> Collections.newSetFromMap(new ConcurrentHashMap<>()))
.add(metric);
final Collection<Collector> metrics =
this.collectors.computeIfAbsent(
category, key -> Collections.newSetFromMap(new ConcurrentHashMap<>()));
final List<String> newSamples =
metric.collect().stream()
.map(metricFamilySamples -> metricFamilySamples.name)
.collect(Collectors.toList());
metrics.stream()
.filter(
collector ->
collector.collect().stream()
.anyMatch(metricFamilySamples -> newSamples.contains(metricFamilySamples.name)))
.findFirst()
.ifPresent(
collector -> {
metrics.remove(collector);
registry.unregister(collector);
});
metrics.add(metric.register(registry));
}
@Override

View File

@@ -217,13 +217,13 @@ public class PrometheusMetricsSystemTest {
}
@Test
public void shouldNotAllowDuplicateGaugeCreation() {
// Gauges have a reference to the source of their data so creating it twice will still only
// pull data from the first instance, possibly leaking memory and likely returning the wrong
// results.
public void shouldAllowDuplicateGaugeCreation() {
// When we are pushing the same gauge, the first one will be unregistered and the new one will
// be used
metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7.0);
assertThatThrownBy(() -> metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7.0))
.isInstanceOf(IllegalArgumentException.class);
metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7.0);
assertThat(metricsSystem.streamObservations())
.containsExactlyInAnyOrder(new Observation(JVM, "myValue", 7.0, emptyList()));
}
@Test