mirror of
https://github.com/vacp2p/linea-besu.git
synced 2026-01-09 15:37:54 -05:00
Process onBlockAdded event asyncronously (#5909)
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
This commit is contained in:
@@ -12,6 +12,7 @@
|
||||
- Add access to an immutable world view to start/end transaction hooks in the tracing API[#5836](https://github.com/hyperledger/besu/pull/5836)
|
||||
- Layered transaction pool implementation is now stable and enabled by default. If you want still to use the legacy implementation, use `--tx-pool=legacy` [#5772](https://github.com/hyperledger/besu)
|
||||
- Tune G1GC to reduce Besu memory footprint, and new `besu-untuned` start scripts to run without any specific G1GC flags [#5879](https://github.com/hyperledger/besu/pull/5879)
|
||||
- Reduce `engine_forkchoiceUpdatedV?` response time by asynchronously process block added events in the transaction pool [#5909](https://github.com/hyperledger/besu/pull/5909)
|
||||
|
||||
### Bug Fixes
|
||||
- do not create ignorable storage on revert storage-variables subcommand [#5830](https://github.com/hyperledger/besu/pull/5830)
|
||||
|
||||
@@ -83,7 +83,7 @@ public class EthScheduler {
|
||||
metricsSystem),
|
||||
MonitoredExecutors.newCachedThreadPool(
|
||||
EthScheduler.class.getSimpleName() + "-Services", metricsSystem),
|
||||
MonitoredExecutors.newBoundedThreadPool(
|
||||
MonitoredExecutors.newFixedThreadPool(
|
||||
EthScheduler.class.getSimpleName() + "-Computation",
|
||||
1,
|
||||
computationWorkerCount,
|
||||
@@ -133,6 +133,10 @@ public class EthScheduler {
|
||||
txWorkerExecutor.execute(command);
|
||||
}
|
||||
|
||||
public <T> CompletableFuture<T> scheduleServiceTask(final Supplier<T> task) {
|
||||
return CompletableFuture.supplyAsync(task, servicesExecutor);
|
||||
}
|
||||
|
||||
public <T> CompletableFuture<T> scheduleServiceTask(final EthTask<T> task) {
|
||||
final CompletableFuture<T> serviceFuture = task.runAsync(servicesExecutor);
|
||||
pendingFutures.add(serviceFuture);
|
||||
|
||||
@@ -62,7 +62,9 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
@@ -104,6 +106,8 @@ public class TransactionPool implements BlockAddedObserver {
|
||||
new PendingTransactionsListenersProxy();
|
||||
private volatile OptionalLong subscribeConnectId = OptionalLong.empty();
|
||||
private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager();
|
||||
private final Lock blockAddedLock = new ReentrantLock();
|
||||
private final Queue<BlockAddedEvent> blockAddedQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
public TransactionPool(
|
||||
final Supplier<PendingTransactions> pendingTransactionsSupplier,
|
||||
@@ -321,16 +325,47 @@ public class TransactionPool implements BlockAddedObserver {
|
||||
@Override
|
||||
public void onBlockAdded(final BlockAddedEvent event) {
|
||||
if (isPoolEnabled.get()) {
|
||||
LOG.trace("Block added event {}", event);
|
||||
final long started = System.currentTimeMillis();
|
||||
if (event.getEventType().equals(BlockAddedEvent.EventType.HEAD_ADVANCED)
|
||||
|| event.getEventType().equals(BlockAddedEvent.EventType.CHAIN_REORG)) {
|
||||
|
||||
pendingTransactions.manageBlockAdded(
|
||||
event.getBlock().getHeader(),
|
||||
event.getAddedTransactions(),
|
||||
event.getRemovedTransactions(),
|
||||
protocolSchedule.getByBlockHeader(event.getBlock().getHeader()).getFeeMarket());
|
||||
reAddTransactions(event.getRemovedTransactions());
|
||||
// add the event to the processing queue
|
||||
blockAddedQueue.add(event);
|
||||
|
||||
// we want to process the added block asynchronously,
|
||||
// but at the same time we must ensure that blocks are processed in order one at time
|
||||
ethContext
|
||||
.getScheduler()
|
||||
.scheduleServiceTask(
|
||||
() -> {
|
||||
while (!blockAddedQueue.isEmpty()) {
|
||||
if (blockAddedLock.tryLock()) {
|
||||
// no other thread is processing the queue, so start processing it
|
||||
try {
|
||||
BlockAddedEvent e = blockAddedQueue.poll();
|
||||
// check again since another thread could have stolen our task
|
||||
if (e != null) {
|
||||
pendingTransactions.manageBlockAdded(
|
||||
e.getBlock().getHeader(),
|
||||
e.getAddedTransactions(),
|
||||
e.getRemovedTransactions(),
|
||||
protocolSchedule
|
||||
.getByBlockHeader(e.getBlock().getHeader())
|
||||
.getFeeMarket());
|
||||
reAddTransactions(e.getRemovedTransactions());
|
||||
LOG.atDebug()
|
||||
.setMessage("Block added event {} processed in {}ms")
|
||||
.addArgument(e)
|
||||
.addArgument(() -> System.currentTimeMillis() - started)
|
||||
.log();
|
||||
}
|
||||
} finally {
|
||||
blockAddedLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.nullable;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
@@ -98,6 +99,7 @@ import java.util.Set;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -229,6 +231,9 @@ public abstract class AbstractTransactionPoolTest {
|
||||
final EthScheduler ethScheduler = mock(EthScheduler.class);
|
||||
syncTaskCapture = ArgumentCaptor.forClass(Runnable.class);
|
||||
doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture());
|
||||
doAnswer(invocation -> ((Supplier<Void>) invocation.getArguments()[0]).get())
|
||||
.when(ethScheduler)
|
||||
.scheduleServiceTask(any(Supplier.class));
|
||||
doReturn(ethScheduler).when(ethContext).getScheduler();
|
||||
|
||||
peerTransactionTracker = new PeerTransactionTracker();
|
||||
|
||||
Reference in New Issue
Block a user