mirror of
https://github.com/vacp2p/linea-besu.git
synced 2026-01-08 15:13:58 -05:00
Fix possible incomplete txpool restore from dump file (#7991)
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
This commit is contained in:
@@ -22,6 +22,7 @@
|
||||
### Bug fixes
|
||||
- Fix serialization of state overrides when `movePrecompileToAddress` is present [#8204](https://github.com/hyperledger/besu/pull/8024)
|
||||
- Revise the approach for setting level_compaction_dynamic_level_bytes RocksDB configuration option [#8037](https://github.com/hyperledger/besu/pull/8037)
|
||||
- Fix possible incomplete txpool restore from dump file [#7991](https://github.com/hyperledger/besu/pull/7991)
|
||||
|
||||
## 24.12.2 Hotfix
|
||||
|
||||
|
||||
@@ -54,11 +54,11 @@ import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.IntSummaryStatistics;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
@@ -73,6 +73,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
@@ -645,15 +646,16 @@ public class TransactionPool implements BlockAddedObserver {
|
||||
isPoolEnabled.set(false);
|
||||
subscribeConnectId.ifPresent(ethContext.getEthPeers()::unsubscribeConnect);
|
||||
pendingTransactionsListenersProxy.unsubscribe();
|
||||
final PendingTransactions pendingTransactionsToSave = pendingTransactions;
|
||||
final CompletableFuture<Void> saveOperation =
|
||||
saveRestoreManager
|
||||
.saveToDisk(pendingTransactions)
|
||||
.exceptionally(
|
||||
t -> {
|
||||
LOG.error("Error while saving transaction pool to disk", t);
|
||||
return null;
|
||||
});
|
||||
pendingTransactions = new DisabledPendingTransactions();
|
||||
return saveRestoreManager
|
||||
.saveToDisk(pendingTransactionsToSave)
|
||||
.exceptionally(
|
||||
t -> {
|
||||
LOG.error("Error while saving transaction pool to disk", t);
|
||||
return null;
|
||||
});
|
||||
return saveOperation;
|
||||
}
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
@@ -750,6 +752,7 @@ public class TransactionPool implements BlockAddedObserver {
|
||||
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
|
||||
|
||||
CompletableFuture<Void> saveToDisk(final PendingTransactions pendingTransactionsToSave) {
|
||||
cancelInProgressReadOperation();
|
||||
return serializeAndDedupOperation(
|
||||
() -> executeSaveToDisk(pendingTransactionsToSave), writeInProgress);
|
||||
}
|
||||
@@ -758,20 +761,31 @@ public class TransactionPool implements BlockAddedObserver {
|
||||
return serializeAndDedupOperation(this::executeLoadFromDisk, readInProgress);
|
||||
}
|
||||
|
||||
private void cancelInProgressReadOperation() {
|
||||
if (!readInProgress.get().isDone()) {
|
||||
LOG.debug("Cancelling in progress read operation");
|
||||
isCancelled.set(true);
|
||||
try {
|
||||
waitUntilReadOperationIsCancelled();
|
||||
LOG.debug("In progress read operation cancelled");
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
LOG.warn("Error while cancelling in progress read operation", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void waitUntilReadOperationIsCancelled()
|
||||
throws InterruptedException, ExecutionException {
|
||||
readInProgress.get().get();
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> serializeAndDedupOperation(
|
||||
final Runnable operation,
|
||||
final AtomicReference<CompletableFuture<Void>> operationInProgress) {
|
||||
if (configuration.getEnableSaveRestore()) {
|
||||
try {
|
||||
if (diskAccessLock.tryAcquire(1, TimeUnit.MINUTES)) {
|
||||
if (!operationInProgress.get().isDone()) {
|
||||
isCancelled.set(true);
|
||||
try {
|
||||
operationInProgress.get().get();
|
||||
} catch (ExecutionException ee) {
|
||||
// nothing to do
|
||||
}
|
||||
}
|
||||
|
||||
isCancelled.set(false);
|
||||
operationInProgress.set(
|
||||
@@ -791,12 +805,17 @@ public class TransactionPool implements BlockAddedObserver {
|
||||
|
||||
private void executeSaveToDisk(final PendingTransactions pendingTransactionsToSave) {
|
||||
final File saveFile = configuration.getSaveFile();
|
||||
final boolean appending = saveFile.exists();
|
||||
try (final BufferedWriter bw =
|
||||
new BufferedWriter(new FileWriter(saveFile, StandardCharsets.US_ASCII))) {
|
||||
new BufferedWriter(new FileWriter(saveFile, StandardCharsets.US_ASCII, appending))) {
|
||||
final var allTxs = pendingTransactionsToSave.getPendingTransactions();
|
||||
LOG.info("Saving {} transactions to file {}", allTxs.size(), saveFile);
|
||||
LOG.info(
|
||||
"{} {} transactions to file {}",
|
||||
appending ? "Appending" : "Saving",
|
||||
allTxs.size(),
|
||||
saveFile);
|
||||
|
||||
final long savedTxs =
|
||||
final long processedTxCount =
|
||||
allTxs.parallelStream()
|
||||
.takeWhile(unused -> !isCancelled.get())
|
||||
.map(
|
||||
@@ -819,13 +838,19 @@ public class TransactionPool implements BlockAddedObserver {
|
||||
return 1;
|
||||
})
|
||||
.sum();
|
||||
|
||||
if (isCancelled.get()) {
|
||||
LOG.info(
|
||||
"Saved {} transactions to file {}, before operation was cancelled",
|
||||
savedTxs,
|
||||
"{} {} transactions to file {}, before operation was cancelled",
|
||||
appending ? "Appended" : "Saved",
|
||||
processedTxCount,
|
||||
saveFile);
|
||||
} else {
|
||||
LOG.info("Saved {} transactions to file {}", savedTxs, saveFile);
|
||||
LOG.info(
|
||||
"{} {} transactions to file {}",
|
||||
appending ? "Appended" : "Saved",
|
||||
processedTxCount,
|
||||
saveFile);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while saving txpool content to disk", e);
|
||||
@@ -839,10 +864,10 @@ public class TransactionPool implements BlockAddedObserver {
|
||||
LOG.info("Loading transaction pool content from file {}", saveFile);
|
||||
try (final BufferedReader br =
|
||||
new BufferedReader(new FileReader(saveFile, StandardCharsets.US_ASCII))) {
|
||||
final IntSummaryStatistics stats =
|
||||
final Map<String, Long> stats =
|
||||
br.lines()
|
||||
.takeWhile(unused -> !isCancelled.get())
|
||||
.mapToInt(
|
||||
.map(
|
||||
line -> {
|
||||
final boolean isLocal = line.charAt(0) == 'l';
|
||||
final Transaction tx =
|
||||
@@ -850,30 +875,66 @@ public class TransactionPool implements BlockAddedObserver {
|
||||
|
||||
final ValidationResult<TransactionInvalidReason> result =
|
||||
addTransaction(tx, isLocal);
|
||||
|
||||
return result.isValid() ? 1 : 0;
|
||||
return result.isValid() ? "OK" : result.getInvalidReason().name();
|
||||
})
|
||||
.summaryStatistics();
|
||||
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
|
||||
|
||||
br.close();
|
||||
|
||||
final var added = stats.getOrDefault("OK", 0L);
|
||||
final var processedLines = stats.values().stream().mapToLong(Long::longValue).sum();
|
||||
|
||||
LOG.debug("Restored transactions stats {}", stats);
|
||||
|
||||
if (isCancelled.get()) {
|
||||
LOG.info(
|
||||
"Added {} transactions of {} loaded from file {}, before operation was cancelled",
|
||||
stats.getSum(),
|
||||
stats.getCount(),
|
||||
added,
|
||||
processedLines,
|
||||
saveFile);
|
||||
removeProcessedLines(saveFile, processedLines);
|
||||
} else {
|
||||
LOG.info(
|
||||
"Added {} transactions of {} loaded from file {}",
|
||||
stats.getSum(),
|
||||
stats.getCount(),
|
||||
"Added {} transactions of {} loaded from file {}, deleting file",
|
||||
added,
|
||||
processedLines,
|
||||
saveFile);
|
||||
saveFile.delete();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while saving txpool content to disk", e);
|
||||
}
|
||||
}
|
||||
saveFile.delete();
|
||||
}
|
||||
}
|
||||
|
||||
private void removeProcessedLines(final File saveFile, final long processedLines)
|
||||
throws IOException {
|
||||
|
||||
LOG.debug("Removing processed lines from save file");
|
||||
|
||||
final var tmp = File.createTempFile(saveFile.getName(), ".tmp");
|
||||
|
||||
try (final BufferedReader reader =
|
||||
Files.newBufferedReader(saveFile.toPath(), StandardCharsets.US_ASCII);
|
||||
final BufferedWriter writer =
|
||||
Files.newBufferedWriter(tmp.toPath(), StandardCharsets.US_ASCII)) {
|
||||
reader
|
||||
.lines()
|
||||
.skip(processedLines)
|
||||
.forEach(
|
||||
line -> {
|
||||
try {
|
||||
writer.write(line);
|
||||
writer.newLine();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
saveFile.delete();
|
||||
Files.move(tmp.toPath(), saveFile.toPath());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user