[PAN-3064] sweep state roots before child nodes (#1854)

* sweep state roots before child nodes

* Adds long removeAccountStateTrieNode(key) method to WorldStateStorage.Updater

* remove assertj assertions from `AbstractKeyValueStorageTest`
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
This commit is contained in:
Ratan Rai Sur
2019-08-20 20:07:12 -04:00
committed by GitHub
parent 8f5e09d4b7
commit 1716091723
13 changed files with 467 additions and 84 deletions

View File

@@ -87,8 +87,8 @@ public interface Blockchain {
default boolean blockIsOnCanonicalChain(final Hash blockHeaderHash) {
return getBlockHeader(blockHeaderHash)
.flatMap(h -> getBlockHashByNumber(h.getNumber()))
.map(h -> h.equals(blockHeaderHash))
.orElse(false);
.filter(h -> h.equals(blockHeaderHash))
.isPresent();
}
/**

View File

@@ -110,6 +110,12 @@ public class WorldStateKeyValueStorage implements WorldStateStorage {
this.nodeAddedListeners = nodeAddedListeners;
}
@Override
public Updater removeAccountStateTrieNode(final Bytes32 nodeHash) {
transaction.remove(nodeHash);
return this;
}
@Override
public Updater putCode(final Bytes32 codeHash, final BytesValue code) {
if (code.size() == 0) {

View File

@@ -12,6 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.worldstate;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
@@ -31,14 +32,18 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MarkSweepPruner {
private static final int DEFAULT_OPS_PER_TRANSACTION = 1000;
private static final Logger LOG = LogManager.getLogger();
private static final BytesValue IN_USE = BytesValue.of(1);
private static final int MARKS_PER_TRANSACTION = 1000;
private final int operationsPerTransaction;
private final WorldStateStorage worldStateStorage;
private final MutableBlockchain blockchain;
private final KeyValueStorage markStorage;
private final Counter markedNodesCounter;
private final Counter markOperationCounter;
@@ -50,10 +55,22 @@ public class MarkSweepPruner {
public MarkSweepPruner(
final WorldStateStorage worldStateStorage,
final MutableBlockchain blockchain,
final KeyValueStorage markStorage,
final MetricsSystem metricsSystem) {
this(worldStateStorage, blockchain, markStorage, metricsSystem, DEFAULT_OPS_PER_TRANSACTION);
}
public MarkSweepPruner(
final WorldStateStorage worldStateStorage,
final MutableBlockchain blockchain,
final KeyValueStorage markStorage,
final MetricsSystem metricsSystem,
final int operationsPerTransaction) {
this.worldStateStorage = worldStateStorage;
this.markStorage = markStorage;
this.blockchain = blockchain;
this.operationsPerTransaction = operationsPerTransaction;
markedNodesCounter =
metricsSystem.createCounter(
@@ -104,11 +121,34 @@ public class MarkSweepPruner {
LOG.info("Completed marking used nodes for pruning");
}
public void sweep() {
public void sweepBefore(final long markedBlockNumber) {
flushPendingMarks();
sweepOperationCounter.inc();
LOG.info("Sweeping unused nodes");
final long prunedNodeCount = worldStateStorage.prune(markStorage::containsKey);
// Sweep state roots first, walking backwards until we get to a state root that isn't in the
// storage
long prunedNodeCount = 0;
WorldStateStorage.Updater updater = worldStateStorage.updater();
for (long blockNumber = markedBlockNumber - 1; blockNumber >= 0; blockNumber--) {
final Hash candidateStateRootHash =
blockchain.getBlockHeader(blockNumber).get().getStateRoot();
if (!worldStateStorage.isWorldStateAvailable(candidateStateRootHash)) {
break;
}
if (!markStorage.containsKey(candidateStateRootHash)) {
updater.removeAccountStateTrieNode(candidateStateRootHash);
prunedNodeCount++;
if (prunedNodeCount % operationsPerTransaction == 0) {
updater.commit();
updater = worldStateStorage.updater();
}
}
}
updater.commit();
// Sweep non-state-root nodes
prunedNodeCount += worldStateStorage.prune(markStorage::containsKey);
sweptNodesCounter.inc(prunedNodeCount);
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
markStorage.clear();
@@ -139,7 +179,8 @@ public class MarkSweepPruner {
.visitAll(storageNode -> markNode(storageNode.getHash()));
}
private void markNode(final Bytes32 hash) {
@VisibleForTesting
void markNode(final Bytes32 hash) {
markedNodesCounter.inc();
markLock.lock();
try {
@@ -151,7 +192,7 @@ public class MarkSweepPruner {
}
private void maybeFlushPendingMarks() {
if (pendingMarks.size() > MARKS_PER_TRANSACTION) {
if (pendingMarks.size() > operationsPerTransaction) {
flushPendingMarks();
}
}

View File

@@ -103,7 +103,7 @@ public class Pruner {
"Begin sweeping unused nodes for pruning. Retention period: {}", retentionPeriodInBlocks);
execute(
() -> {
pruningStrategy.sweep();
pruningStrategy.sweepBefore(markBlockNumber);
state.compareAndSet(State.SWEEPING, State.IDLE);
});
}

View File

@@ -46,6 +46,8 @@ public interface WorldStateStorage {
interface Updater {
Updater removeAccountStateTrieNode(Bytes32 nodeHash);
Updater putCode(Bytes32 nodeHash, BytesValue code);
default Updater putCode(final BytesValue code) {

View File

@@ -205,6 +205,7 @@ public class BlockDataGenerator {
final BlockOptions options =
new BlockOptions()
.setBlockNumber(BlockHeader.GENESIS_BLOCK_NUMBER)
.setStateRoot(Hash.EMPTY_TRIE_HASH)
.setParentHash(Hash.ZERO);
return block(options);
}

View File

@@ -13,58 +13,350 @@
package tech.pegasys.pantheon.ethereum.worldstate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.spy;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.MutableWorldState;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.core.WorldState;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.WorldStatePreimageKeyValueStorage;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.trie.StoredMerklePatriciaTrie;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.junit.Test;
import org.mockito.InOrder;
public class MarkSweepPrunerTest {
private final BlockDataGenerator gen = new BlockDataGenerator();
private final NoOpMetricsSystem metricsSystem = new NoOpMetricsSystem();
private final Map<BytesValue, BytesValue> hashValueStore = spy(new HashMap<>());
private final InMemoryKeyValueStorage stateStorage = spy(new TestInMemoryStorage(hashValueStore));
private final WorldStateStorage worldStateStorage = new WorldStateKeyValueStorage(stateStorage);
private final WorldStateArchive worldStateArchive =
new WorldStateArchive(
worldStateStorage, new WorldStatePreimageKeyValueStorage(new InMemoryKeyValueStorage()));
private final InMemoryKeyValueStorage markStorage = new InMemoryKeyValueStorage();
private final Block genesisBlock = gen.genesisBlock();
private final MutableBlockchain blockchain = createInMemoryBlockchain(genesisBlock);
@Test
public void shouldMarkAllNodesInCurrentWorldState() {
public void prepareMarkAndSweep_smallState_manyOpsPerTx() {
testPrepareMarkAndSweep(3, 1, 2, 1000);
}
// Setup "remote" state
final InMemoryKeyValueStorage markStorage = new InMemoryKeyValueStorage();
final InMemoryKeyValueStorage stateStorage = new InMemoryKeyValueStorage();
final WorldStateStorage worldStateStorage = new WorldStateKeyValueStorage(stateStorage);
final WorldStateArchive worldStateArchive =
new WorldStateArchive(
worldStateStorage,
new WorldStatePreimageKeyValueStorage(new InMemoryKeyValueStorage()));
final MutableWorldState worldState = worldStateArchive.getMutable();
@Test
public void prepareMarkAndSweep_largeState_fewOpsPerTx() {
testPrepareMarkAndSweep(20, 5, 5, 5);
}
@Test
public void prepareMarkAndSweep_emptyBlocks() {
testPrepareMarkAndSweep(10, 0, 5, 10);
}
@Test
public void prepareMarkAndSweep_markChainhead() {
testPrepareMarkAndSweep(10, 2, 10, 20);
}
@Test
public void prepareMarkAndSweep_markGenesis() {
testPrepareMarkAndSweep(10, 2, 0, 20);
}
@Test
public void prepareMarkAndSweep_multipleRounds() {
testPrepareMarkAndSweep(10, 2, 10, 20);
testPrepareMarkAndSweep(10, 2, 15, 20);
}
private void testPrepareMarkAndSweep(
final int numBlocks,
final int accountsPerBlock,
final int markBlockNumber,
final int opsPerTransaction) {
final MarkSweepPruner pruner =
new MarkSweepPruner(
worldStateStorage, blockchain, markStorage, metricsSystem, opsPerTransaction);
final int chainHeight = (int) blockchain.getChainHead().getHeight();
// Generate blocks up to markBlockNumber
final int blockCountBeforeMarkedBlock = markBlockNumber - chainHeight;
generateBlockchainData(blockCountBeforeMarkedBlock, accountsPerBlock);
// Prepare
pruner.prepare();
// Mark
final BlockHeader markBlock = blockchain.getBlockHeader(markBlockNumber).get();
pruner.mark(markBlock.getStateRoot());
// Generate more blocks that should be kept
generateBlockchainData(numBlocks - blockCountBeforeMarkedBlock, accountsPerBlock);
// Collect the nodes we expect to keep
final Set<BytesValue> expectedNodes = collectWorldStateNodes(markBlock.getStateRoot());
for (int i = markBlockNumber; i <= blockchain.getChainHeadBlockNumber(); i++) {
final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot();
collectWorldStateNodes(stateRoot, expectedNodes);
}
if (accountsPerBlock != 0 && markBlockNumber > 0) {
assertThat(hashValueStore.size()).isGreaterThan(expectedNodes.size()); // Sanity check
}
// Sweep
pruner.sweepBefore(markBlock.getNumber());
// Assert that blocks from mark point onward are still accessible
for (int i = markBlockNumber; i <= blockchain.getChainHeadBlockNumber(); i++) {
final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot();
assertThat(worldStateArchive.get(stateRoot)).isPresent();
final WorldState markedState = worldStateArchive.get(stateRoot).get();
// Traverse accounts and make sure all are accessible
final int expectedAccounts = accountsPerBlock * i;
final long accounts = markedState.streamAccounts(Bytes32.ZERO, expectedAccounts * 2).count();
assertThat(accounts).isEqualTo(expectedAccounts);
// Traverse storage to ensure that all storage is accessible
markedState
.streamAccounts(Bytes32.ZERO, expectedAccounts * 2)
.forEach(a -> a.storageEntriesFrom(Bytes32.ZERO, 1000));
}
// All other state roots should have been removed
for (int i = 0; i < markBlockNumber; i++) {
final BlockHeader curHeader = blockchain.getBlockHeader(i + 1L).get();
if (curHeader.getNumber() == markBlock.getNumber()) {
continue;
}
if (!curHeader.getStateRoot().equals(Hash.EMPTY_TRIE_HASH)) {
assertThat(worldStateArchive.get(curHeader.getStateRoot())).isEmpty();
}
}
// Check that storage contains only the values we expect
assertThat(hashValueStore.size()).isEqualTo(expectedNodes.size());
assertThat(hashValueStore.values()).containsExactlyInAnyOrderElementsOf(expectedNodes);
}
@Test
public void mark_marksAllExpectedNodes() {
final MarkSweepPruner pruner =
new MarkSweepPruner(worldStateStorage, blockchain, markStorage, metricsSystem);
// Generate accounts and save corresponding state root
gen.createRandomContractAccountsWithNonEmptyStorage(worldState, 20);
final Hash stateRoot = worldState.rootHash();
final int numBlocks = 15;
final int numAccounts = 10;
generateBlockchainData(numBlocks, numAccounts);
final int markBlockNumber = 10;
final BlockHeader markBlock = blockchain.getBlockHeader(markBlockNumber).get();
// Collect the nodes we expect to keep
final Set<BytesValue> expectedNodes = collectWorldStateNodes(markBlock.getStateRoot());
assertThat(hashValueStore.size()).isGreaterThan(expectedNodes.size()); // Sanity check
// Mark and sweep
pruner.mark(markBlock.getStateRoot());
pruner.sweepBefore(markBlock.getNumber());
// Assert that the block we marked is still present and all accounts are accessible
assertThat(worldStateArchive.get(markBlock.getStateRoot())).isPresent();
final WorldState markedState = worldStateArchive.get(markBlock.getStateRoot()).get();
// Traverse accounts and make sure all are accessible
final int expectedAccounts = numAccounts * markBlockNumber;
final long accounts = markedState.streamAccounts(Bytes32.ZERO, expectedAccounts * 2).count();
assertThat(accounts).isEqualTo(expectedAccounts);
// Traverse storage to ensure that all storage is accessible
markedState
.streamAccounts(Bytes32.ZERO, expectedAccounts * 2)
.forEach(a -> a.storageEntriesFrom(Bytes32.ZERO, 1000));
// All other state roots should have been removed
for (int i = 0; i < numBlocks; i++) {
final BlockHeader curHeader = blockchain.getBlockHeader(i + 1L).get();
if (curHeader.getNumber() == markBlock.getNumber()) {
continue;
}
assertThat(worldStateArchive.get(curHeader.getStateRoot())).isEmpty();
}
// Check that storage contains only the values we expect
assertThat(hashValueStore.size()).isEqualTo(expectedNodes.size());
assertThat(hashValueStore.values()).containsExactlyInAnyOrderElementsOf(expectedNodes);
}
@Test
public void sweepBefore_shouldSweepStateRootFirst() {
final MarkSweepPruner pruner =
new MarkSweepPruner(worldStateStorage, markStorage, metricsSystem);
pruner.mark(stateRoot);
pruner.flushPendingMarks();
new MarkSweepPruner(worldStateStorage, blockchain, markStorage, metricsSystem, 1);
final Set<BytesValue> keysToKeep = new HashSet<>(stateStorage.keySet());
assertThat(markStorage.keySet()).containsExactlyInAnyOrderElementsOf(keysToKeep);
// Generate accounts and save corresponding state root
final int numBlocks = 15;
final int numAccounts = 10;
generateBlockchainData(numBlocks, numAccounts);
// Generate some more nodes from a world state we didn't mark
gen.createRandomContractAccountsWithNonEmptyStorage(worldStateArchive.getMutable(), 10);
assertThat(stateStorage.keySet()).hasSizeGreaterThan(keysToKeep.size());
final int markBlockNumber = 10;
final BlockHeader markBlock = blockchain.getBlockHeader(markBlockNumber).get();
// All those new nodes should be removed when we sweep
pruner.sweep();
assertThat(stateStorage.keySet()).containsExactlyInAnyOrderElementsOf(keysToKeep);
assertThat(markStorage.keySet()).isEmpty();
// Collect state roots we expect to be swept first
final List<Bytes32> stateRoots = new ArrayList<>();
for (int i = markBlockNumber - 1; i >= 0; i--) {
stateRoots.add(blockchain.getBlockHeader(i).get().getStateRoot());
}
// Mark and sweep
pruner.mark(markBlock.getStateRoot());
pruner.sweepBefore(markBlock.getNumber());
// Check stateRoots are marked first
InOrder inOrder = inOrder(hashValueStore, stateStorage);
for (Bytes32 stateRoot : stateRoots) {
inOrder.verify(hashValueStore).remove(stateRoot);
}
inOrder.verify(stateStorage).removeUnless(any());
}
@Test
public void sweepBefore_shouldNotRemoveMarkedStateRoots() {
final MarkSweepPruner pruner =
new MarkSweepPruner(worldStateStorage, blockchain, markStorage, metricsSystem, 1);
// Generate accounts and save corresponding state root
final int numBlocks = 15;
final int numAccounts = 10;
generateBlockchainData(numBlocks, numAccounts);
final int markBlockNumber = 10;
final BlockHeader markBlock = blockchain.getBlockHeader(markBlockNumber).get();
// Collect state roots we expect to be swept first
final List<Bytes32> stateRoots = new ArrayList<>();
for (int i = markBlockNumber - 1; i >= 0; i--) {
stateRoots.add(blockchain.getBlockHeader(i).get().getStateRoot());
}
// Mark
pruner.mark(markBlock.getStateRoot());
// Mark an extra state root
Hash markedRoot = Hash.wrap(stateRoots.remove(stateRoots.size() / 2));
pruner.markNode(markedRoot);
// Sweep
pruner.sweepBefore(markBlock.getNumber());
// Check stateRoots are marked first
InOrder inOrder = inOrder(hashValueStore, stateStorage);
for (Bytes32 stateRoot : stateRoots) {
inOrder.verify(hashValueStore).remove(stateRoot);
}
inOrder.verify(stateStorage).removeUnless(any());
assertThat(stateStorage.containsKey(markedRoot)).isTrue();
}
private void generateBlockchainData(final int numBlocks, final int numAccounts) {
Block parentBlock = blockchain.getChainHeadBlock();
for (int i = 0; i < numBlocks; i++) {
final MutableWorldState worldState =
worldStateArchive.getMutable(parentBlock.getHeader().getStateRoot()).get();
gen.createRandomContractAccountsWithNonEmptyStorage(worldState, numAccounts);
final Hash stateRoot = worldState.rootHash();
final Block block =
gen.block(
BlockOptions.create()
.setStateRoot(stateRoot)
.setBlockNumber(parentBlock.getHeader().getNumber() + 1L)
.setParentHash(parentBlock.getHash()));
final List<TransactionReceipt> receipts = gen.receipts(block);
blockchain.appendBlock(block, receipts);
parentBlock = block;
}
}
private Set<BytesValue> collectWorldStateNodes(final Hash stateRootHash) {
final Set<BytesValue> nodeData = new HashSet<>();
collectWorldStateNodes(stateRootHash, nodeData);
return nodeData;
}
private Set<BytesValue> collectWorldStateNodes(
final Hash stateRootHash, final Set<BytesValue> collector) {
final List<Hash> storageRoots = new ArrayList<>();
final MerklePatriciaTrie<Bytes32, BytesValue> stateTrie = createStateTrie(stateRootHash);
// Collect storage roots and code
stateTrie
.entriesFrom(Bytes32.ZERO, 1000)
.forEach(
(key, val) -> {
final StateTrieAccountValue accountValue =
StateTrieAccountValue.readFrom(RLP.input(val));
stateStorage.get(accountValue.getCodeHash()).ifPresent(collector::add);
storageRoots.add(accountValue.getStorageRoot());
});
// Collect state nodes
collectTrieNodes(stateTrie, collector);
// Collect storage nodes
for (Hash storageRoot : storageRoots) {
final MerklePatriciaTrie<Bytes32, BytesValue> storageTrie = createStorageTrie(storageRoot);
collectTrieNodes(storageTrie, collector);
}
return collector;
}
private void collectTrieNodes(
final MerklePatriciaTrie<Bytes32, BytesValue> trie, final Set<BytesValue> collector) {
final Bytes32 rootHash = trie.getRootHash();
trie.visitAll(
(node) -> {
if (node.isReferencedByHash() || node.getHash().equals(rootHash)) {
collector.add(node.getRlp());
}
});
}
private MerklePatriciaTrie<Bytes32, BytesValue> createStateTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStateTrieNode,
rootHash,
Function.identity(),
Function.identity());
}
private MerklePatriciaTrie<Bytes32, BytesValue> createStorageTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStorageTrieNode,
rootHash,
Function.identity(),
Function.identity());
}
private static class TestInMemoryStorage extends InMemoryKeyValueStorage {
public TestInMemoryStorage(final Map<BytesValue, BytesValue> hashValueStore) {
super(hashValueStore);
}
}
}

View File

@@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.worldstate;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -68,7 +69,7 @@ public class PrunerTest {
appendBlockWithParent(blockchain, blockchain.getChainHeadBlock());
verify(markSweepPruner).mark(block1.getHeader().getStateRoot());
verify(markSweepPruner).sweep();
verify(markSweepPruner).sweepBefore(1);
pruner.stop();
}
@@ -87,14 +88,14 @@ public class PrunerTest {
final Hash markBlockStateRootHash =
appendBlockWithParent(blockchain, genesisBlock).getHeader().getStateRoot();
verify(markSweepPruner, never()).mark(markBlockStateRootHash);
verify(markSweepPruner, never()).sweep();
verify(markSweepPruner, never()).sweepBefore(anyLong());
appendBlockWithParent(blockchain, blockchain.getChainHeadBlock());
verify(markSweepPruner).mark(markBlockStateRootHash);
verify(markSweepPruner, never()).sweep();
verify(markSweepPruner, never()).sweepBefore(anyLong());
appendBlockWithParent(blockchain, blockchain.getChainHeadBlock());
verify(markSweepPruner).sweep();
verify(markSweepPruner).sweepBefore(1);
pruner.stop();
}
@@ -113,7 +114,7 @@ public class PrunerTest {
/*
Set up pre-marking state:
O <---- marking of the this block's parent will begin when this block is added
O <---- marking of this block's parent will begin when this block is added
|
| O <- this is a fork as of now (non-canonical)
O | <- this is the initially canonical block that will be marked
@@ -135,7 +136,7 @@ public class PrunerTest {
*/
appendBlockWithParent(blockchain, forkBlock);
verify(markSweepPruner).mark(initiallyCanonicalBlock.getHeader().getStateRoot());
verify(markSweepPruner, never()).sweep();
verify(markSweepPruner, never()).sweepBefore(anyLong());
pruner.stop();
}

View File

@@ -206,9 +206,10 @@ public abstract class PantheonControllerBuilder<C> {
new Pruner(
new MarkSweepPruner(
protocolContext.getWorldStateArchive().getWorldStateStorage(),
blockchain,
storageProvider.createPruningStorage(),
metricsSystem),
protocolContext.getBlockchain(),
blockchain,
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)

View File

@@ -171,7 +171,7 @@ public class ColumnarRocksDbKeyValueStorage
rocksIterator.next();
}
} catch (final RocksDBException e) {
throw new KeyValueStorage.StorageException(e);
throw new StorageException(e);
}
return removedNodeCounter;
}
@@ -188,7 +188,7 @@ public class ColumnarRocksDbKeyValueStorage
}
}
} catch (final RocksDBException e) {
throw new KeyValueStorage.StorageException(e);
throw new StorageException(e);
}
}

View File

@@ -14,7 +14,6 @@ package tech.pegasys.pantheon.services.kvstore;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -27,9 +26,17 @@ import java.util.function.Predicate;
public class InMemoryKeyValueStorage implements KeyValueStorage {
private final Map<BytesValue, BytesValue> hashValueStore = new HashMap<>();
private final Map<BytesValue, BytesValue> hashValueStore;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
public InMemoryKeyValueStorage() {
this(new HashMap<>());
}
protected InMemoryKeyValueStorage(final Map<BytesValue, BytesValue> hashValueStore) {
this.hashValueStore = hashValueStore;
}
@Override
public void clear() {
final Lock lock = rwLock.writeLock();
@@ -68,8 +75,15 @@ public class InMemoryKeyValueStorage implements KeyValueStorage {
@Override
public long removeUnless(final Predicate<BytesValue> inUseCheck) {
hashValueStore.keySet().removeIf(key -> !inUseCheck.test(key));
return 0;
final Lock lock = rwLock.writeLock();
lock.lock();
try {
long initialSize = hashValueStore.keySet().size();
hashValueStore.keySet().removeIf(key -> !inUseCheck.test(key));
return initialSize - hashValueStore.keySet().size();
} finally {
lock.unlock();
}
}
@Override
@@ -78,7 +92,7 @@ public class InMemoryKeyValueStorage implements KeyValueStorage {
}
public Set<BytesValue> keySet() {
return Collections.unmodifiableSet(new HashSet<>(hashValueStore.keySet()));
return Set.copyOf(hashValueStore.keySet());
}
private class InMemoryTransaction extends AbstractTransaction {
@@ -104,7 +118,7 @@ public class InMemoryKeyValueStorage implements KeyValueStorage {
lock.lock();
try {
hashValueStore.putAll(updatedValues);
removedKeys.forEach(k -> hashValueStore.remove(k));
removedKeys.forEach(hashValueStore::remove);
updatedValues = null;
removedKeys = null;
} finally {

View File

@@ -78,8 +78,15 @@ public class LimitedInMemoryKeyValueStorage implements KeyValueStorage {
@Override
public long removeUnless(final Predicate<BytesValue> inUseCheck) {
storage.asMap().keySet().removeIf(key -> !inUseCheck.test(key));
return 0;
final Lock lock = rwLock.writeLock();
lock.lock();
try {
final long initialSize = storage.size();
storage.asMap().keySet().removeIf(key -> !inUseCheck.test(key));
return initialSize - storage.size();
} finally {
lock.unlock();
}
}
@Override

View File

@@ -12,10 +12,7 @@
*/
package tech.pegasys.pantheon.services.kvstore;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage.Transaction;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@@ -43,7 +40,7 @@ public abstract class AbstractKeyValueStorageTest {
tx.put(BytesValue.fromHexString("0001"), BytesValue.fromHexString("0FFF"));
tx.commit();
final Optional<BytesValue> result = store2.get(BytesValue.fromHexString("0001"));
assertEquals(Optional.empty(), result);
assertThat(result).isEmpty();
}
@Test
@@ -53,14 +50,30 @@ public abstract class AbstractKeyValueStorageTest {
Transaction tx = store.startTransaction();
tx.put(BytesValue.fromHexString("0F"), BytesValue.fromHexString("0ABC"));
tx.commit();
assertEquals(
Optional.of(BytesValue.fromHexString("0ABC")), store.get(BytesValue.fromHexString("0F")));
assertThat(store.get(BytesValue.fromHexString("0F")))
.contains(BytesValue.fromHexString("0ABC"));
tx = store.startTransaction();
tx.put(BytesValue.fromHexString("0F"), BytesValue.fromHexString("0DEF"));
tx.commit();
assertEquals(
Optional.of(BytesValue.fromHexString("0DEF")), store.get(BytesValue.fromHexString("0F")));
assertThat(store.get(BytesValue.fromHexString("0F")))
.contains(BytesValue.fromHexString("0DEF"));
}
@Test
public void removeUnless() throws Exception {
final KeyValueStorage store = createStore();
Transaction tx = store.startTransaction();
tx.put(BytesValue.fromHexString("0F"), BytesValue.fromHexString("0ABC"));
tx.put(BytesValue.fromHexString("10"), BytesValue.fromHexString("0ABC"));
tx.put(BytesValue.fromHexString("11"), BytesValue.fromHexString("0ABC"));
tx.put(BytesValue.fromHexString("12"), BytesValue.fromHexString("0ABC"));
tx.commit();
store.removeUnless(bv -> bv.toString().contains("1"));
assertThat(store.containsKey(BytesValue.fromHexString("0F"))).isFalse();
assertThat(store.containsKey(BytesValue.fromHexString("10"))).isTrue();
assertThat(store.containsKey(BytesValue.fromHexString("11"))).isTrue();
assertThat(store.containsKey(BytesValue.fromHexString("12"))).isTrue();
}
@Test
@@ -68,13 +81,13 @@ public abstract class AbstractKeyValueStorageTest {
final KeyValueStorage store = createStore();
final BytesValue key = BytesValue.fromHexString("ABCD");
assertFalse(store.containsKey(key));
assertThat(store.containsKey(key)).isFalse();
final Transaction transaction = store.startTransaction();
transaction.put(key, BytesValue.fromHexString("DEFF"));
transaction.commit();
assertTrue(store.containsKey(key));
assertThat(store.containsKey(key)).isTrue();
}
@Test
@@ -86,7 +99,7 @@ public abstract class AbstractKeyValueStorageTest {
tx = store.startTransaction();
tx.remove(BytesValue.fromHexString("0F"));
tx.commit();
assertEquals(Optional.empty(), store.get(BytesValue.fromHexString("0F")));
assertThat(store.get(BytesValue.fromHexString("0F"))).isEmpty();
}
@Test
@@ -96,7 +109,7 @@ public abstract class AbstractKeyValueStorageTest {
tx.put(BytesValue.fromHexString("0F"), BytesValue.fromHexString("0ABC"));
tx.remove(BytesValue.fromHexString("0F"));
tx.commit();
assertEquals(Optional.empty(), store.get(BytesValue.fromHexString("0F")));
assertThat(store.get(BytesValue.fromHexString("0F"))).isEmpty();
}
@Test
@@ -105,7 +118,7 @@ public abstract class AbstractKeyValueStorageTest {
Transaction tx = store.startTransaction();
tx.remove(BytesValue.fromHexString("0F"));
tx.commit();
assertEquals(Optional.empty(), store.get(BytesValue.fromHexString("0F")));
assertThat(store.get(BytesValue.fromHexString("0F"))).isEmpty();
}
@Test
@@ -140,7 +153,7 @@ public abstract class AbstractKeyValueStorageTest {
for (int i = 0; i < keyCount; i++) {
final BytesValue key = BytesValues.toMinimalBytes(i);
final BytesValue actual = store.get(key).get();
assertTrue(actual.equals(a) || actual.equals(b));
assertThat(actual.equals(a) || actual.equals(b)).isTrue();
}
store.close();
@@ -164,18 +177,23 @@ public abstract class AbstractKeyValueStorageTest {
tx.put(BytesValue.of(4), BytesValue.of(8));
// Check values before committing have not changed
assertEquals(store.get(BytesValue.of(1)).get(), BytesValue.of(1));
assertEquals(store.get(BytesValue.of(2)).get(), BytesValue.of(2));
assertEquals(store.get(BytesValue.of(3)).get(), BytesValue.of(3));
assertEquals(store.get(BytesValue.of(4)), Optional.empty());
assertThat(store.get(BytesValue.of(1))).contains(BytesValue.of(1));
assertThat(store.get(BytesValue.of(2))).contains(BytesValue.of(2));
assertThat(store.get(BytesValue.of(3))).contains(BytesValue.of(3));
assertThat(store.get(BytesValue.of(4))).isEmpty();
assertThat(store.get(BytesValue.of(1))).contains(BytesValue.of(1));
assertThat(store.get(BytesValue.of(2))).contains(BytesValue.of(2));
assertThat(store.get(BytesValue.of(3))).contains(BytesValue.of(3));
assertThat(store.get(BytesValue.of(4))).isEmpty();
tx.commit();
// Check that values have been updated after commit
assertEquals(store.get(BytesValue.of(1)).get(), BytesValue.of(1));
assertEquals(store.get(BytesValue.of(2)).get(), BytesValue.of(4));
assertEquals(store.get(BytesValue.of(3)), Optional.empty());
assertEquals(store.get(BytesValue.of(4)).get(), BytesValue.of(8));
assertThat(store.get(BytesValue.of(1))).contains(BytesValue.of(1));
assertThat(store.get(BytesValue.of(2))).contains(BytesValue.of(4));
assertThat(store.get(BytesValue.of(3))).isEmpty();
assertThat(store.get(BytesValue.of(4))).contains(BytesValue.of(8));
}
@Test
@@ -196,18 +214,18 @@ public abstract class AbstractKeyValueStorageTest {
tx.put(BytesValue.of(4), BytesValue.of(8));
// Check values before committing have not changed
assertEquals(store.get(BytesValue.of(1)).get(), BytesValue.of(1));
assertEquals(store.get(BytesValue.of(2)).get(), BytesValue.of(2));
assertEquals(store.get(BytesValue.of(3)).get(), BytesValue.of(3));
assertEquals(store.get(BytesValue.of(4)), Optional.empty());
assertThat(store.get(BytesValue.of(1))).contains(BytesValue.of(1));
assertThat(store.get(BytesValue.of(2))).contains(BytesValue.of(2));
assertThat(store.get(BytesValue.of(3))).contains(BytesValue.of(3));
assertThat(store.get(BytesValue.of(4))).isEmpty();
tx.rollback();
// Check that values have not changed after rollback
assertEquals(store.get(BytesValue.of(1)).get(), BytesValue.of(1));
assertEquals(store.get(BytesValue.of(2)).get(), BytesValue.of(2));
assertEquals(store.get(BytesValue.of(3)).get(), BytesValue.of(3));
assertEquals(store.get(BytesValue.of(4)), Optional.empty());
assertThat(store.get(BytesValue.of(1))).contains(BytesValue.of(1));
assertThat(store.get(BytesValue.of(2))).contains(BytesValue.of(2));
assertThat(store.get(BytesValue.of(3))).contains(BytesValue.of(3));
assertThat(store.get(BytesValue.of(4))).isEmpty();
}
@Test
@@ -301,8 +319,8 @@ public abstract class AbstractKeyValueStorageTest {
tx1.commit();
tx2.commit();
assertEquals(store.get(BytesValue.of(1)).get(), BytesValue.of(1));
assertEquals(store.get(BytesValue.of(2)).get(), BytesValue.of(2));
assertThat(store.get(BytesValue.of(1))).contains(BytesValue.of(1));
assertThat(store.get(BytesValue.of(2))).contains(BytesValue.of(2));
}
@Test
@@ -342,8 +360,8 @@ public abstract class AbstractKeyValueStorageTest {
finalValues[i] = store.get(key).get();
}
Arrays.fill(expectedValues, 0, keyCount, finalValues[0]);
assertArrayEquals(expectedValues, finalValues);
assertTrue(finalValues[0].equals(a) || finalValues[0].equals(b));
assertThat(finalValues).containsExactly(expectedValues);
assertThat(finalValues[0].equals(a) || finalValues[0].equals(b)).isTrue();
store.close();
}