mirror of
https://github.com/vacp2p/linea-besu.git
synced 2026-01-08 23:17:54 -05:00
Healing Mechanism for Flat Database in Besu (#5319)
The proposed pull request introduces a feature that allows healing of the flat database by streaming the flat database data and validating it by generating a proof from the trie structure. If the proof is found to be invalid, the code traverses the trie to fix the invalid range. To optimize the process and avoid checking the entire flat database, the PR includes enhancements such as tracking the accounts that need to be repaired during SnapSync. By implementing these optimizations, the PR aims to significantly reduce the time and resources required for repairing the flat database. Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
This commit is contained in:
@@ -120,6 +120,11 @@ public class InMemoryKeyValueStorage
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Pair<byte[], byte[]>> streamFromKey(final byte[] startKey) {
|
||||
return stream().filter(e -> Bytes.wrap(startKey).compareTo(Bytes.wrap(e.getKey())) <= 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<byte[]> streamKeys() {
|
||||
final Lock lock = rwLock.readLock();
|
||||
|
||||
@@ -110,6 +110,11 @@ public class LayeredKeyValueStorage extends InMemoryKeyValueStorage
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Pair<byte[], byte[]>> streamFromKey(final byte[] startKey) {
|
||||
return stream().filter(e -> Bytes.wrap(startKey).compareTo(Bytes.wrap(e.getKey())) <= 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<byte[]> streamKeys() {
|
||||
throwIfClosed();
|
||||
|
||||
@@ -113,6 +113,11 @@ public class LimitedInMemoryKeyValueStorage implements KeyValueStorage {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Pair<byte[], byte[]>> streamFromKey(final byte[] startKey) {
|
||||
return stream().filter(e -> Bytes.wrap(startKey).compareTo(Bytes.wrap(e.getKey())) <= 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<byte[]> streamKeys() {
|
||||
final Lock lock = rwLock.readLock();
|
||||
|
||||
@@ -78,6 +78,17 @@ public interface SegmentedKeyValueStorage<S> extends Closeable {
|
||||
*/
|
||||
Stream<Pair<byte[], byte[]>> stream(final S segmentHandle);
|
||||
|
||||
/**
|
||||
* Returns a stream of key-value pairs starting from the specified key. This method is used to
|
||||
* retrieve a stream of data from the storage, starting from the given key. If no data is
|
||||
* available from the specified key onwards, an empty stream is returned.
|
||||
*
|
||||
* @param segmentHandle The segment handle whose keys we want to stream.
|
||||
* @param startKey The key from which the stream should start.
|
||||
* @return A stream of key-value pairs starting from the specified key.
|
||||
*/
|
||||
Stream<Pair<byte[], byte[]>> streamFromKey(final S segmentHandle, final byte[] startKey);
|
||||
|
||||
/**
|
||||
* Stream keys.
|
||||
*
|
||||
|
||||
@@ -88,6 +88,11 @@ public class SegmentedKeyValueStorageAdapter<S> implements KeyValueStorage {
|
||||
return storage.stream(segmentHandle);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Pair<byte[], byte[]>> streamFromKey(final byte[] startKey) throws StorageException {
|
||||
return storage.streamFromKey(segmentHandle, startKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<byte[]> streamKeys() {
|
||||
throwIfClosed();
|
||||
|
||||
@@ -169,7 +169,7 @@ public class Pipeline<I> {
|
||||
if (tracingEnabled) {
|
||||
taskSpan.setStatus(StatusCode.ERROR);
|
||||
}
|
||||
LOG.debug("Unhandled exception in pipeline. Aborting.", t);
|
||||
LOG.info("Unhandled exception in pipeline. Aborting.", t);
|
||||
try {
|
||||
abort(t);
|
||||
} catch (final Throwable t2) {
|
||||
|
||||
Reference in New Issue
Block a user