mirror of
https://github.com/vacp2p/linea-besu.git
synced 2026-01-08 23:17:54 -05:00
Prevent rocksdb segfaults when accessing closed storage (#5527)
* add isClosed check to Transaction decorator to prevent segfaults on shutdown Signed-off-by: garyschulte <garyschulte@gmail.com>
This commit is contained in:
@@ -10,6 +10,7 @@
|
||||
- Unite the tx-pool CLI options under the same Tx Pool Options group in UX. [#5466](https://github.com/hyperledger/besu/issues/5466)
|
||||
|
||||
### Bug Fixes
|
||||
- check to ensure storage and transactions are not closed prior to reading/writing [#5527](https://github.com/hyperledger/besu/pull/5527)
|
||||
|
||||
### Download Links
|
||||
|
||||
|
||||
@@ -77,14 +77,23 @@ public abstract class ExecutionEngineJsonRpcMethod implements JsonRpcMethod {
|
||||
cf.complete(
|
||||
resp.otherwise(
|
||||
t -> {
|
||||
LOG.error(
|
||||
String.format("failed to exec consensus method %s", this.getName()),
|
||||
t);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.atDebug()
|
||||
.setMessage("failed to exec consensus method {}")
|
||||
.addArgument(this.getName())
|
||||
.setCause(t)
|
||||
.log();
|
||||
} else {
|
||||
LOG.atError()
|
||||
.setMessage("failed to exec consensus method {}, error: {}")
|
||||
.addArgument(this.getName())
|
||||
.addArgument(t.getMessage())
|
||||
.log();
|
||||
}
|
||||
return new JsonRpcErrorResponse(
|
||||
request.getRequest().getId(), JsonRpcError.INVALID_REQUEST);
|
||||
})
|
||||
.result()));
|
||||
|
||||
try {
|
||||
return cf.get();
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
@@ -69,7 +69,7 @@ Calculated : ${currentHash}
|
||||
tasks.register('checkAPIChanges', FileStateChecker) {
|
||||
description = "Checks that the API for the Plugin-API project does not change without deliberate thought"
|
||||
files = sourceSets.main.allJava.files
|
||||
knownHash = 'gfED3Pzd/+CSSrTtDo1X+lkc7qwffDqF98EZIDMCOIc='
|
||||
knownHash = 'V3sh575rrexPv+Ywe8mURT4Z3fREDCDd79PpAISFx8A='
|
||||
}
|
||||
check.dependsOn('checkAPIChanges')
|
||||
|
||||
|
||||
@@ -114,4 +114,11 @@ public interface KeyValueStorage extends Closeable {
|
||||
* @throws StorageException problem encountered when starting a new transaction.
|
||||
*/
|
||||
KeyValueStorageTransaction startTransaction() throws StorageException;
|
||||
|
||||
/**
|
||||
* Return Whether the underlying storage is closed.
|
||||
*
|
||||
* @return boolean indicating whether the storage is closed.
|
||||
*/
|
||||
boolean isClosed();
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ public class OptimisticRocksDBColumnarKeyValueStorage extends RocksDBColumnarKey
|
||||
final WriteOptions writeOptions = new WriteOptions();
|
||||
writeOptions.setIgnoreMissingColumnFamilies(true);
|
||||
return new SegmentedKeyValueStorageTransactionTransitionValidatorDecorator<>(
|
||||
new RocksDbTransaction(db.beginTransaction(writeOptions), writeOptions));
|
||||
new RocksDbTransaction(db.beginTransaction(writeOptions), writeOptions), this.closed::get);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -108,6 +108,11 @@ public class RocksDBColumnarKeyValueSnapshot implements SnappedKeyValueStorage {
|
||||
return snapTx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return closed.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
throw new UnsupportedOperationException(
|
||||
|
||||
@@ -87,7 +87,9 @@ public abstract class RocksDBColumnarKeyValueStorage
|
||||
RocksDbUtil.loadNativeLibrary();
|
||||
}
|
||||
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
/** atomic boolean to track if the storage is closed */
|
||||
protected final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
private final WriteOptions tryDeleteOptions =
|
||||
new WriteOptions().setNoSlowdown(true).setIgnoreMissingColumnFamilies(true);
|
||||
private final ReadOptions readOptions = new ReadOptions().setVerifyChecksums(false);
|
||||
@@ -328,6 +330,11 @@ public abstract class RocksDBColumnarKeyValueStorage
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return closed.get();
|
||||
}
|
||||
|
||||
void throwIfClosed() {
|
||||
if (closed.get()) {
|
||||
LOG.error("Attempting to use a closed RocksDbKeyValueStorage");
|
||||
|
||||
@@ -93,10 +93,7 @@ public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, A
|
||||
* @return the optional data
|
||||
*/
|
||||
public Optional<byte[]> get(final byte[] key) {
|
||||
if (isClosed.get()) {
|
||||
LOG.debug("Attempted to access closed snapshot");
|
||||
return Optional.empty();
|
||||
}
|
||||
throwIfClosed();
|
||||
|
||||
try (final OperationTimer.TimingContext ignored = metrics.getReadLatency().startTimer()) {
|
||||
return Optional.ofNullable(snapTx.get(columnFamilyHandle, readOptions, key));
|
||||
@@ -107,10 +104,7 @@ public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, A
|
||||
|
||||
@Override
|
||||
public void put(final byte[] key, final byte[] value) {
|
||||
if (isClosed.get()) {
|
||||
LOG.debug("Attempted to access closed snapshot");
|
||||
return;
|
||||
}
|
||||
throwIfClosed();
|
||||
|
||||
try (final OperationTimer.TimingContext ignored = metrics.getWriteLatency().startTimer()) {
|
||||
snapTx.put(columnFamilyHandle, key, value);
|
||||
@@ -125,10 +119,8 @@ public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, A
|
||||
|
||||
@Override
|
||||
public void remove(final byte[] key) {
|
||||
if (isClosed.get()) {
|
||||
LOG.debug("Attempted to access closed snapshot");
|
||||
return;
|
||||
}
|
||||
throwIfClosed();
|
||||
|
||||
try (final OperationTimer.TimingContext ignored = metrics.getRemoveLatency().startTimer()) {
|
||||
snapTx.delete(columnFamilyHandle, key);
|
||||
} catch (final RocksDBException e) {
|
||||
@@ -146,6 +138,8 @@ public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, A
|
||||
* @return the stream
|
||||
*/
|
||||
public Stream<Pair<byte[], byte[]>> stream() {
|
||||
throwIfClosed();
|
||||
|
||||
final RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
|
||||
rocksIterator.seekToFirst();
|
||||
return RocksDbIterator.create(rocksIterator).toStream();
|
||||
@@ -157,6 +151,8 @@ public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, A
|
||||
* @return the stream
|
||||
*/
|
||||
public Stream<byte[]> streamKeys() {
|
||||
throwIfClosed();
|
||||
|
||||
final RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
|
||||
rocksIterator.seekToFirst();
|
||||
return RocksDbIterator.create(rocksIterator).toStreamKeys();
|
||||
@@ -169,6 +165,8 @@ public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, A
|
||||
|
||||
@Override
|
||||
public void rollback() {
|
||||
throwIfClosed();
|
||||
|
||||
try {
|
||||
snapTx.rollback();
|
||||
metrics.getRollbackCount().inc();
|
||||
@@ -189,9 +187,7 @@ public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, A
|
||||
* @return the rocks db snapshot transaction
|
||||
*/
|
||||
public RocksDBSnapshotTransaction copy() {
|
||||
if (isClosed.get()) {
|
||||
throw new StorageException("Snapshot already closed");
|
||||
}
|
||||
throwIfClosed();
|
||||
try {
|
||||
var copyReadOptions = new ReadOptions().setSnapshot(snapshot.markAndUseSnapshot());
|
||||
var copySnapTx = db.beginTransaction(writeOptions);
|
||||
@@ -213,4 +209,11 @@ public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, A
|
||||
snapshot.unMarkSnapshot();
|
||||
isClosed.set(true);
|
||||
}
|
||||
|
||||
private void throwIfClosed() {
|
||||
if (isClosed.get()) {
|
||||
LOG.error("Attempting to use a closed RocksDBSnapshotTransaction");
|
||||
throw new StorageException("Storage has already been closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,6 +86,6 @@ public class TransactionDBRocksDBColumnarKeyValueStorage extends RocksDBColumnar
|
||||
final WriteOptions writeOptions = new WriteOptions();
|
||||
writeOptions.setIgnoreMissingColumnFamilies(true);
|
||||
return new SegmentedKeyValueStorageTransactionTransitionValidatorDecorator<>(
|
||||
new RocksDbTransaction(db.beginTransaction(writeOptions), writeOptions));
|
||||
new RocksDbTransaction(db.beginTransaction(writeOptions), writeOptions), this.closed::get);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,6 +97,7 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
|
||||
@Override
|
||||
public void clear() throws StorageException {
|
||||
throwIfClosed();
|
||||
|
||||
try (final RocksIterator rocksIterator = db.newIterator()) {
|
||||
rocksIterator.seekToFirst();
|
||||
if (rocksIterator.isValid()) {
|
||||
@@ -141,6 +142,7 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
|
||||
@Override
|
||||
public Stream<Pair<byte[], byte[]>> stream() {
|
||||
throwIfClosed();
|
||||
|
||||
final RocksIterator rocksIterator = db.newIterator();
|
||||
rocksIterator.seekToFirst();
|
||||
return RocksDbIterator.create(rocksIterator).toStream();
|
||||
@@ -149,6 +151,7 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
|
||||
@Override
|
||||
public Stream<byte[]> streamKeys() {
|
||||
throwIfClosed();
|
||||
|
||||
final RocksIterator rocksIterator = db.newIterator();
|
||||
rocksIterator.seekToFirst();
|
||||
return RocksDbIterator.create(rocksIterator).toStreamKeys();
|
||||
@@ -186,6 +189,11 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
|
||||
new RocksDBTransaction(db.beginTransaction(options), options, rocksDBMetrics));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return closed.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
|
||||
@@ -154,6 +154,11 @@ public class InMemoryKeyValueStorage
|
||||
return new KeyValueStorageTransactionTransitionValidatorDecorator(new InMemoryTransaction());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Key set.
|
||||
*
|
||||
|
||||
@@ -30,11 +30,15 @@ import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Streams;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.tuweni.bytes.Bytes;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/** Key value storage which stores in memory all updates to a parent worldstate storage. */
|
||||
public class LayeredKeyValueStorage extends InMemoryKeyValueStorage
|
||||
implements SnappedKeyValueStorage {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LayeredKeyValueStorage.class);
|
||||
|
||||
private final KeyValueStorage parent;
|
||||
|
||||
/**
|
||||
@@ -65,6 +69,8 @@ public class LayeredKeyValueStorage extends InMemoryKeyValueStorage
|
||||
|
||||
@Override
|
||||
public Optional<byte[]> get(final byte[] key) throws StorageException {
|
||||
throwIfClosed();
|
||||
|
||||
final Lock lock = rwLock.readLock();
|
||||
lock.lock();
|
||||
try {
|
||||
@@ -82,6 +88,8 @@ public class LayeredKeyValueStorage extends InMemoryKeyValueStorage
|
||||
|
||||
@Override
|
||||
public Stream<Pair<byte[], byte[]>> stream() {
|
||||
throwIfClosed();
|
||||
|
||||
final Lock lock = rwLock.readLock();
|
||||
lock.lock();
|
||||
try {
|
||||
@@ -104,6 +112,8 @@ public class LayeredKeyValueStorage extends InMemoryKeyValueStorage
|
||||
|
||||
@Override
|
||||
public Stream<byte[]> streamKeys() {
|
||||
throwIfClosed();
|
||||
|
||||
final Lock lock = rwLock.readLock();
|
||||
lock.lock();
|
||||
try {
|
||||
@@ -131,6 +141,8 @@ public class LayeredKeyValueStorage extends InMemoryKeyValueStorage
|
||||
|
||||
@Override
|
||||
public KeyValueStorageTransaction startTransaction() {
|
||||
throwIfClosed();
|
||||
|
||||
return new KeyValueStorageTransactionTransitionValidatorDecorator(
|
||||
new InMemoryTransaction() {
|
||||
@Override
|
||||
@@ -151,8 +163,20 @@ public class LayeredKeyValueStorage extends InMemoryKeyValueStorage
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return parent.isClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SnappedKeyValueStorage clone() {
|
||||
return new LayeredKeyValueStorage(hashValueStore, parent);
|
||||
}
|
||||
|
||||
private void throwIfClosed() {
|
||||
if (parent.isClosed()) {
|
||||
LOG.error("Attempting to use a closed RocksDBKeyValueStorage");
|
||||
throw new StorageException("Storage has been closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,6 +144,11 @@ public class LimitedInMemoryKeyValueStorage implements KeyValueStorage {
|
||||
return new KeyValueStorageTransactionTransitionValidatorDecorator(new MemoryTransaction());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return false;
|
||||
}
|
||||
|
||||
private class MemoryTransaction implements KeyValueStorageTransaction {
|
||||
|
||||
private Map<Bytes, byte[]> updatedValues = new HashMap<>();
|
||||
|
||||
@@ -123,6 +123,13 @@ public interface SegmentedKeyValueStorage<S> extends Closeable {
|
||||
*/
|
||||
void clear(S segmentHandle);
|
||||
|
||||
/**
|
||||
* Whether the underlying storage is closed.
|
||||
*
|
||||
* @return boolean indicating whether the underlying storage is closed.
|
||||
*/
|
||||
boolean isClosed();
|
||||
|
||||
/**
|
||||
* Represents a set of changes to be committed atomically. A single transaction is not
|
||||
* thread-safe, but multiple transactions can execute concurrently.
|
||||
|
||||
@@ -26,6 +26,8 @@ import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* The type Segmented key value storage adapter.
|
||||
@@ -34,6 +36,7 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
*/
|
||||
public class SegmentedKeyValueStorageAdapter<S> implements KeyValueStorage {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SegmentedKeyValueStorageAdapter.class);
|
||||
private final S segmentHandle;
|
||||
private final SegmentedKeyValueStorage<S> storage;
|
||||
|
||||
@@ -51,41 +54,49 @@ public class SegmentedKeyValueStorageAdapter<S> implements KeyValueStorage {
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
throwIfClosed();
|
||||
storage.clear(segmentHandle);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsKey(final byte[] key) throws StorageException {
|
||||
throwIfClosed();
|
||||
return storage.containsKey(segmentHandle, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<byte[]> get(final byte[] key) throws StorageException {
|
||||
throwIfClosed();
|
||||
return storage.get(segmentHandle, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<byte[]> getAllKeysThat(final Predicate<byte[]> returnCondition) {
|
||||
throwIfClosed();
|
||||
return storage.getAllKeysThat(segmentHandle, returnCondition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<byte[]> getAllValuesFromKeysThat(final Predicate<byte[]> returnCondition) {
|
||||
throwIfClosed();
|
||||
return storage.getAllValuesFromKeysThat(segmentHandle, returnCondition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Pair<byte[], byte[]>> stream() {
|
||||
throwIfClosed();
|
||||
return storage.stream(segmentHandle);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<byte[]> streamKeys() {
|
||||
throwIfClosed();
|
||||
return storage.streamKeys(segmentHandle);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryDelete(final byte[] key) {
|
||||
throwIfClosed();
|
||||
return storage.tryDelete(segmentHandle, key);
|
||||
}
|
||||
|
||||
@@ -101,23 +112,39 @@ public class SegmentedKeyValueStorageAdapter<S> implements KeyValueStorage {
|
||||
|
||||
@Override
|
||||
public void put(final byte[] key, final byte[] value) {
|
||||
throwIfClosed();
|
||||
transaction.put(segmentHandle, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(final byte[] key) {
|
||||
throwIfClosed();
|
||||
transaction.remove(segmentHandle, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() throws StorageException {
|
||||
throwIfClosed();
|
||||
transaction.commit();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollback() {
|
||||
throwIfClosed();
|
||||
transaction.rollback();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return storage.isClosed();
|
||||
}
|
||||
|
||||
private void throwIfClosed() {
|
||||
if (storage.isClosed()) {
|
||||
LOG.error("Attempting to use a closed Storage instance.");
|
||||
throw new StorageException("Storage has been closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,8 @@ import static com.google.common.base.Preconditions.checkState;
|
||||
import org.hyperledger.besu.plugin.services.exception.StorageException;
|
||||
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorage.Transaction;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* The Segmented key value storage transaction transition validator decorator.
|
||||
*
|
||||
@@ -28,33 +30,39 @@ public class SegmentedKeyValueStorageTransactionTransitionValidatorDecorator<S>
|
||||
implements Transaction<S> {
|
||||
|
||||
private final Transaction<S> transaction;
|
||||
private final Supplier<Boolean> isClosed;
|
||||
private boolean active = true;
|
||||
|
||||
/**
|
||||
* Instantiates a new Segmented key value storage transaction transition validator decorator.
|
||||
*
|
||||
* @param toDecorate the to decorate
|
||||
* @param isClosed supplier that returns true if the storage is closed
|
||||
*/
|
||||
public SegmentedKeyValueStorageTransactionTransitionValidatorDecorator(
|
||||
final Transaction<S> toDecorate) {
|
||||
final Transaction<S> toDecorate, final Supplier<Boolean> isClosed) {
|
||||
this.transaction = toDecorate;
|
||||
this.isClosed = isClosed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void put(final S segment, final byte[] key, final byte[] value) {
|
||||
checkState(active, "Cannot invoke put() on a completed transaction.");
|
||||
checkState(!isClosed.get(), "Cannot invoke put() on a closed storage.");
|
||||
transaction.put(segment, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void remove(final S segment, final byte[] key) {
|
||||
checkState(active, "Cannot invoke remove() on a completed transaction.");
|
||||
checkState(!isClosed.get(), "Cannot invoke remove() on a closed storage.");
|
||||
transaction.remove(segment, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void commit() throws StorageException {
|
||||
checkState(active, "Cannot commit a completed transaction.");
|
||||
checkState(!isClosed.get(), "Cannot invoke commit() on a closed storage.");
|
||||
active = false;
|
||||
transaction.commit();
|
||||
}
|
||||
@@ -62,6 +70,7 @@ public class SegmentedKeyValueStorageTransactionTransitionValidatorDecorator<S>
|
||||
@Override
|
||||
public final void rollback() {
|
||||
checkState(active, "Cannot rollback a completed transaction.");
|
||||
checkState(!isClosed.get(), "Cannot invoke rollback() on a closed storage.");
|
||||
active = false;
|
||||
transaction.rollback();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user