From 23f5e2e93382b6c59e0959b6854283c4a3b96a59 Mon Sep 17 00:00:00 2001 From: garyschulte Date: Fri, 22 Apr 2022 17:42:59 +0200 Subject: [PATCH] wrap rocksdb segmenthandles in atomicreference to ensure we do not reference closed handles (#3734) Signed-off-by: garyschulte --- .../RocksDBColumnarKeyValueStorage.java | 68 ++++++++++--------- .../RocksDBColumnarKeyValueStorageTest.java | 32 +++++---- .../kvstore/SegmentedKeyValueStorage.java | 5 +- .../SegmentedKeyValueStorageAdapter.java | 21 +++--- 4 files changed, 68 insertions(+), 58 deletions(-) diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java index 181eebef0..0a37084da 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java @@ -31,16 +31,17 @@ import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageTransaction import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import com.google.common.collect.ImmutableMap; import org.apache.tuweni.bytes.Bytes; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.ColumnFamilyDescriptor; @@ -62,19 +63,19 @@ import org.slf4j.LoggerFactory; public class RocksDBColumnarKeyValueStorage implements SegmentedKeyValueStorage { - static { - RocksDbUtil.loadNativeLibrary(); - } - private static final Logger LOG = LoggerFactory.getLogger(RocksDBColumnarKeyValueStorage.class); private static final String DEFAULT_COLUMN = "default"; private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device"; + static { + RocksDbUtil.loadNativeLibrary(); + } + private final DBOptions options; private final TransactionDBOptions txOptions; private final TransactionDB db; private final AtomicBoolean closed = new AtomicBoolean(false); - private final Map columnHandlesByName; + private final Map> columnHandlesByName; private final RocksDBMetrics metrics; private final WriteOptions tryDeleteOptions = new WriteOptions().setNoSlowdown(true); @@ -127,14 +128,17 @@ public class RocksDBColumnarKeyValueStorage Collectors.toMap( segment -> Bytes.wrap(segment.getId()), SegmentIdentifier::getName)); - columnHandlesByName = new HashMap<>(); + final ImmutableMap.Builder> builder = + ImmutableMap.builder(); for (ColumnFamilyHandle columnHandle : columnHandles) { final String segmentName = requireNonNullElse( segmentsById.get(Bytes.wrap(columnHandle.getName())), DEFAULT_COLUMN); - columnHandlesByName.put(segmentName, columnHandle); + builder.put(segmentName, new AtomicReference<>(columnHandle)); } + columnHandlesByName = builder.build(); + } catch (final RocksDBException e) { throw new StorageException(e); } @@ -146,7 +150,8 @@ public class RocksDBColumnarKeyValueStorage } @Override - public ColumnFamilyHandle getSegmentIdentifierByName(final SegmentIdentifier segment) { + public AtomicReference getSegmentIdentifierByName( + final SegmentIdentifier segment) { return columnHandlesByName.get(segment.getName()); } @@ -198,30 +203,27 @@ public class RocksDBColumnarKeyValueStorage } @Override - public ColumnFamilyHandle clear(final ColumnFamilyHandle segmentHandle) { - try { + public void clear(final ColumnFamilyHandle segmentHandle) { - var entry = - columnHandlesByName.entrySet().stream() - .filter(e -> e.getValue().equals(segmentHandle)) - .findAny(); + var entry = + columnHandlesByName.values().stream().filter(e -> e.get().equals(segmentHandle)).findAny(); - if (entry.isPresent()) { - String segmentName = entry.get().getKey(); - ColumnFamilyDescriptor descriptor = - new ColumnFamilyDescriptor( - segmentHandle.getName(), segmentHandle.getDescriptor().getOptions()); - db.dropColumnFamily(segmentHandle); - segmentHandle.close(); - ColumnFamilyHandle newHandle = db.createColumnFamily(descriptor); - columnHandlesByName.put(segmentName, newHandle); - return newHandle; - } - - return segmentHandle; - - } catch (final RocksDBException e) { - throw new StorageException(e); + if (entry.isPresent()) { + AtomicReference segmentHandleRef = entry.get(); + segmentHandleRef.getAndUpdate( + oldHandle -> { + try { + ColumnFamilyDescriptor descriptor = + new ColumnFamilyDescriptor( + segmentHandle.getName(), segmentHandle.getDescriptor().getOptions()); + db.dropColumnFamily(oldHandle); + ColumnFamilyHandle newHandle = db.createColumnFamily(descriptor); + segmentHandle.close(); + return newHandle; + } catch (final RocksDBException e) { + throw new StorageException(e); + } + }); } } @@ -231,7 +233,9 @@ public class RocksDBColumnarKeyValueStorage txOptions.close(); options.close(); tryDeleteOptions.close(); - columnHandlesByName.values().forEach(ColumnFamilyHandle::close); + columnHandlesByName.values().stream() + .map(AtomicReference::get) + .forEach(ColumnFamilyHandle::close); db.close(); } } diff --git a/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java b/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java index 6e27cb8f4..95da9eda6 100644 --- a/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java +++ b/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java @@ -31,8 +31,8 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.Supplier; import org.junit.Rule; import org.junit.Test; @@ -49,7 +49,10 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT final byte[] val1 = bytesFromHexString("0FFF"); final byte[] val2 = bytesFromHexString("1337"); final SegmentedKeyValueStorage store = createSegmentedStore(); - Supplier segment = () -> store.getSegmentIdentifierByName(TestSegment.FOO); + AtomicReference segment = store.getSegmentIdentifierByName(TestSegment.FOO); + KeyValueStorage duplicateSegmentRef = + new SegmentedKeyValueStorageAdapter<>(TestSegment.FOO, store); + final Consumer insert = value -> { final Transaction tx = store.startTransaction(); @@ -59,18 +62,18 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT // insert val: insert.accept(val1); - final Optional result = store.get(segment.get(), key); - assertThat(result.orElse(null)).isEqualTo(val1); + assertThat(store.get(segment.get(), key).orElse(null)).isEqualTo(val1); + assertThat(duplicateSegmentRef.get(key).orElse(null)).isEqualTo(val1); // clear and assert empty: store.clear(segment.get()); - final Optional truncResult = store.get(segment.get(), key); - assertThat(truncResult).isEmpty(); + assertThat(store.get(segment.get(), key)).isEmpty(); + assertThat(duplicateSegmentRef.get(key)).isEmpty(); // insert into empty: insert.accept(val2); - final Optional nextResult = store.get(segment.get(), key); - assertThat(nextResult.orElse(null)).isEqualTo(val2); + assertThat(store.get(segment.get(), key).orElse(null)).isEqualTo(val2); + assertThat(duplicateSegmentRef.get(key).orElse(null)).isEqualTo(val2); store.close(); } @@ -81,13 +84,14 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT final Transaction tx = store.startTransaction(); tx.put( - store.getSegmentIdentifierByName(TestSegment.BAR), + store.getSegmentIdentifierByName(TestSegment.BAR).get(), bytesFromHexString("0001"), bytesFromHexString("0FFF")); tx.commit(); final Optional result = - store.get(store.getSegmentIdentifierByName(TestSegment.FOO), bytesFromHexString("0001")); + store.get( + store.getSegmentIdentifierByName(TestSegment.FOO).get(), bytesFromHexString("0001")); assertThat(result).isEmpty(); @@ -100,8 +104,8 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT // properly for (int i = 0; i < 50; i++) { final SegmentedKeyValueStorage store = createSegmentedStore(); - final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO); - final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR); + final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO).get(); + final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR).get(); final Transaction tx = store.startTransaction(); tx.put(fooSegment, bytesOf(1), bytesOf(1)); @@ -144,8 +148,8 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT @Test public void canGetThroughSegmentIteration() throws Exception { final SegmentedKeyValueStorage store = createSegmentedStore(); - final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO); - final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR); + final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO).get(); + final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR).get(); final Transaction tx = store.startTransaction(); tx.put(fooSegment, bytesOf(1), bytesOf(1)); diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java index aa7f02247..a09b1a898 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java @@ -20,6 +20,7 @@ import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; import java.io.Closeable; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Stream; @@ -30,7 +31,7 @@ import java.util.stream.Stream; */ public interface SegmentedKeyValueStorage extends Closeable { - S getSegmentIdentifierByName(SegmentIdentifier segment); + AtomicReference getSegmentIdentifierByName(SegmentIdentifier segment); /** * Get the value from the associated segment and key. @@ -74,7 +75,7 @@ public interface SegmentedKeyValueStorage extends Closeable { Set getAllKeysThat(S segmentHandle, Predicate returnCondition); - S clear(S segmentHandle); + void clear(S segmentHandle); /** * Represents a set of changes to be committed atomically. A single transaction is not diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java index f2d7c2d07..1fc656278 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java @@ -22,47 +22,48 @@ import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; import java.io.IOException; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Stream; public class SegmentedKeyValueStorageAdapter implements KeyValueStorage { - private S segmentHandle; + private final AtomicReference segmentHandle; private final SegmentedKeyValueStorage storage; public SegmentedKeyValueStorageAdapter( final SegmentIdentifier segment, final SegmentedKeyValueStorage storage) { - this.segmentHandle = storage.getSegmentIdentifierByName(segment); + segmentHandle = storage.getSegmentIdentifierByName(segment); this.storage = storage; } @Override public void clear() { - segmentHandle = storage.clear(segmentHandle); + storage.clear(segmentHandle.get()); } @Override public boolean containsKey(final byte[] key) throws StorageException { - return storage.containsKey(segmentHandle, key); + return storage.containsKey(segmentHandle.get(), key); } @Override public Optional get(final byte[] key) throws StorageException { - return storage.get(segmentHandle, key); + return storage.get(segmentHandle.get(), key); } @Override public Set getAllKeysThat(final Predicate returnCondition) { - return storage.getAllKeysThat(segmentHandle, returnCondition); + return storage.getAllKeysThat(segmentHandle.get(), returnCondition); } @Override public Stream streamKeys() { - return storage.streamKeys(segmentHandle); + return storage.streamKeys(segmentHandle.get()); } @Override public boolean tryDelete(final byte[] key) { - return storage.tryDelete(segmentHandle, key); + return storage.tryDelete(segmentHandle.get(), key); } @Override @@ -77,12 +78,12 @@ public class SegmentedKeyValueStorageAdapter implements KeyValueStorage { @Override public void put(final byte[] key, final byte[] value) { - transaction.put(segmentHandle, key, value); + transaction.put(segmentHandle.get(), key, value); } @Override public void remove(final byte[] key) { - transaction.remove(segmentHandle, key); + transaction.remove(segmentHandle.get(), key); } @Override