mirror of
https://github.com/vacp2p/status-linea-besu.git
synced 2026-01-09 22:07:59 -05:00
Disable txpool when not in sync (#6302)
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
- Set Ethereum Classic mainnet activation block for Spiral network upgrade [#6267](https://github.com/hyperledger/besu/pull/6267)
|
||||
- Add custom genesis file name to config overview if specified [#6297](https://github.com/hyperledger/besu/pull/6297)
|
||||
- Update Gradle plugins and replace unmaintained License Gradle Plugin with the actively maintained Gradle License Report [#6275](https://github.com/hyperledger/besu/pull/6275)
|
||||
- Disable transaction handling when the node is not in sync, to avoid unnecessary transaction validation work [#6302](https://github.com/hyperledger/besu/pull/6302)
|
||||
- Optimize RocksDB WAL files, allows for faster restart and a more linear disk space utilization [#6328](https://github.com/hyperledger/besu/pull/6328)
|
||||
|
||||
### Bug fixes
|
||||
|
||||
@@ -41,7 +41,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
|
||||
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
|
||||
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
|
||||
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
|
||||
@@ -64,6 +63,7 @@ import org.hyperledger.besu.plugin.data.LogWithMetadata;
|
||||
import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
|
||||
import org.hyperledger.besu.plugin.data.SyncStatus;
|
||||
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
|
||||
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
|
||||
import org.hyperledger.besu.testutil.TestClock;
|
||||
|
||||
import java.math.BigInteger;
|
||||
@@ -100,7 +100,6 @@ public class BesuEventsImplTest {
|
||||
@Mock private EthPeers mockEthPeers;
|
||||
@Mock private EthContext mockEthContext;
|
||||
@Mock private EthMessages mockEthMessages;
|
||||
@Mock private EthScheduler mockEthScheduler;
|
||||
|
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
|
||||
private TransactionValidatorFactory mockTransactionValidatorFactory;
|
||||
@@ -128,7 +127,7 @@ public class BesuEventsImplTest {
|
||||
|
||||
when(mockEthContext.getEthMessages()).thenReturn(mockEthMessages);
|
||||
when(mockEthContext.getEthPeers()).thenReturn(mockEthPeers);
|
||||
when(mockEthContext.getScheduler()).thenReturn(mockEthScheduler);
|
||||
when(mockEthContext.getScheduler()).thenReturn(new DeterministicEthScheduler());
|
||||
lenient().when(mockEthPeers.streamAvailablePeers()).thenAnswer(z -> Stream.empty());
|
||||
when(mockProtocolContext.getBlockchain()).thenReturn(blockchain);
|
||||
lenient().when(mockProtocolContext.getWorldStateArchive()).thenReturn(mockWorldStateArchive);
|
||||
|
||||
@@ -18,6 +18,7 @@ import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@@ -107,7 +108,7 @@ public class EthGetFilterChangesIntegrationTest {
|
||||
blockchain::getChainHeadHeader);
|
||||
final ProtocolContext protocolContext = executionContext.getProtocolContext();
|
||||
|
||||
EthContext ethContext = mock(EthContext.class);
|
||||
EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS);
|
||||
EthPeers ethPeers = mock(EthPeers.class);
|
||||
when(ethContext.getEthPeers()).thenReturn(ethPeers);
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@@ -107,7 +108,7 @@ public class EthGetFilterChangesIntegrationTest {
|
||||
blockchain::getChainHeadHeader);
|
||||
final ProtocolContext protocolContext = executionContext.getProtocolContext();
|
||||
|
||||
EthContext ethContext = mock(EthContext.class);
|
||||
EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS);
|
||||
EthPeers ethPeers = mock(EthPeers.class);
|
||||
when(ethContext.getEthPeers()).thenReturn(ethPeers);
|
||||
|
||||
|
||||
@@ -23,9 +23,11 @@ import org.hyperledger.besu.util.ExceptionUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
@@ -34,6 +36,8 @@ import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@@ -295,4 +299,49 @@ public class EthScheduler {
|
||||
delay,
|
||||
unit);
|
||||
}
|
||||
|
||||
public <ITEM> OrderedProcessor<ITEM> createOrderedProcessor(final Consumer<ITEM> processor) {
|
||||
return new OrderedProcessor<>(processor);
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is a way to execute a set of tasks, one by one, in a strict order, without blocking
|
||||
* the caller in case there are still previous tasks queued
|
||||
*
|
||||
* @param <ITEM> the class of item to be processed
|
||||
*/
|
||||
public class OrderedProcessor<ITEM> {
|
||||
private final Queue<ITEM> blockAddedQueue = new ConcurrentLinkedQueue<>();
|
||||
private final ReentrantLock blockAddedLock = new ReentrantLock();
|
||||
private final Consumer<ITEM> processor;
|
||||
|
||||
private OrderedProcessor(final Consumer<ITEM> processor) {
|
||||
this.processor = processor;
|
||||
}
|
||||
|
||||
public void submit(final ITEM item) {
|
||||
// add the item to the processing queue
|
||||
blockAddedQueue.add(item);
|
||||
|
||||
if (blockAddedLock.hasQueuedThreads()) {
|
||||
// another thread is already waiting to process the queue with our item, there is no need to
|
||||
// schedule another thread
|
||||
LOG.trace(
|
||||
"Block added event queue is already being processed and an already queued thread is present, nothing to do");
|
||||
} else {
|
||||
servicesExecutor.submit(
|
||||
() -> {
|
||||
blockAddedLock.lock();
|
||||
try {
|
||||
// now that we have the lock, process as many items as possible
|
||||
for (ITEM i = blockAddedQueue.poll(); i != null; i = blockAddedQueue.poll()) {
|
||||
processor.accept(i);
|
||||
}
|
||||
} finally {
|
||||
blockAddedLock.unlock();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
|
||||
import org.hyperledger.besu.ethereum.core.Transaction;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
|
||||
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
|
||||
import org.hyperledger.besu.ethereum.mainnet.TransactionValidationParams;
|
||||
import org.hyperledger.besu.ethereum.mainnet.TransactionValidator;
|
||||
@@ -61,11 +62,9 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
@@ -107,8 +106,7 @@ public class TransactionPool implements BlockAddedObserver {
|
||||
private volatile OptionalLong subscribeConnectId = OptionalLong.empty();
|
||||
private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager();
|
||||
private final Set<Address> localSenders = ConcurrentHashMap.newKeySet();
|
||||
private final Lock blockAddedLock = new ReentrantLock();
|
||||
private final Queue<BlockAddedEvent> blockAddedQueue = new ConcurrentLinkedQueue<>();
|
||||
private final EthScheduler.OrderedProcessor<BlockAddedEvent> blockAddedEventOrderedProcessor;
|
||||
|
||||
public TransactionPool(
|
||||
final Supplier<PendingTransactions> pendingTransactionsSupplier,
|
||||
@@ -130,6 +128,8 @@ public class TransactionPool implements BlockAddedObserver {
|
||||
pluginTransactionValidatorFactory == null
|
||||
? null
|
||||
: pluginTransactionValidatorFactory.create();
|
||||
this.blockAddedEventOrderedProcessor =
|
||||
ethContext.getScheduler().createOrderedProcessor(this::processBlockAddedEvent);
|
||||
initLogForReplay();
|
||||
}
|
||||
|
||||
@@ -322,58 +322,29 @@ public class TransactionPool implements BlockAddedObserver {
|
||||
@Override
|
||||
public void onBlockAdded(final BlockAddedEvent event) {
|
||||
if (isPoolEnabled.get()) {
|
||||
final long started = System.currentTimeMillis();
|
||||
if (event.getEventType().equals(BlockAddedEvent.EventType.HEAD_ADVANCED)
|
||||
|| event.getEventType().equals(BlockAddedEvent.EventType.CHAIN_REORG)) {
|
||||
|
||||
// add the event to the processing queue
|
||||
blockAddedQueue.add(event);
|
||||
|
||||
// we want to process the added block asynchronously,
|
||||
// but at the same time we must ensure that blocks are processed in order one at time
|
||||
ethContext
|
||||
.getScheduler()
|
||||
.scheduleServiceTask(
|
||||
() -> {
|
||||
while (!blockAddedQueue.isEmpty()) {
|
||||
if (blockAddedLock.tryLock()) {
|
||||
// no other thread is processing the queue, so start processing it
|
||||
try {
|
||||
BlockAddedEvent e = blockAddedQueue.poll();
|
||||
// check again since another thread could have stolen our task
|
||||
if (e != null) {
|
||||
pendingTransactions.manageBlockAdded(
|
||||
e.getBlock().getHeader(),
|
||||
e.getAddedTransactions(),
|
||||
e.getRemovedTransactions(),
|
||||
protocolSchedule
|
||||
.getByBlockHeader(e.getBlock().getHeader())
|
||||
.getFeeMarket());
|
||||
reAddTransactions(e.getRemovedTransactions());
|
||||
LOG.atTrace()
|
||||
.setMessage("Block added event {} processed in {}ms")
|
||||
.addArgument(e)
|
||||
.addArgument(() -> System.currentTimeMillis() - started)
|
||||
.log();
|
||||
}
|
||||
} finally {
|
||||
blockAddedLock.unlock();
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
// wait a bit before retrying
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
blockAddedEventOrderedProcessor.submit(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processBlockAddedEvent(final BlockAddedEvent e) {
|
||||
final long started = System.currentTimeMillis();
|
||||
pendingTransactions.manageBlockAdded(
|
||||
e.getBlock().getHeader(),
|
||||
e.getAddedTransactions(),
|
||||
e.getRemovedTransactions(),
|
||||
protocolSchedule.getByBlockHeader(e.getBlock().getHeader()).getFeeMarket());
|
||||
reAddTransactions(e.getRemovedTransactions());
|
||||
LOG.atTrace()
|
||||
.setMessage("Block added event {} processed in {}ms")
|
||||
.addArgument(e)
|
||||
.addArgument(() -> System.currentTimeMillis() - started)
|
||||
.log();
|
||||
}
|
||||
|
||||
private void reAddTransactions(final List<Transaction> reAddTransactions) {
|
||||
if (!reAddTransactions.isEmpty()) {
|
||||
// if adding a blob tx, and it is missing its blob, is a re-org and we should restore the blob
|
||||
|
||||
@@ -156,24 +156,62 @@ public class TransactionPoolFactory {
|
||||
@Override
|
||||
public void onInitialSyncCompleted() {
|
||||
LOG.info("Enabling transaction handling following initial sync");
|
||||
transactionTracker.reset();
|
||||
transactionPool.setEnabled();
|
||||
transactionsMessageHandler.setEnabled();
|
||||
pooledTransactionsMessageHandler.setEnabled();
|
||||
enableTransactionHandling(
|
||||
transactionTracker,
|
||||
transactionPool,
|
||||
transactionsMessageHandler,
|
||||
pooledTransactionsMessageHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInitialSyncRestart() {
|
||||
LOG.info("Disabling transaction handling during re-sync");
|
||||
pooledTransactionsMessageHandler.setDisabled();
|
||||
transactionsMessageHandler.setDisabled();
|
||||
transactionPool.setDisabled();
|
||||
disableTransactionHandling(
|
||||
transactionPool, transactionsMessageHandler, pooledTransactionsMessageHandler);
|
||||
}
|
||||
});
|
||||
|
||||
syncState.subscribeInSync(
|
||||
isInSync -> {
|
||||
if (isInSync != transactionPool.isEnabled()) {
|
||||
if (isInSync) {
|
||||
LOG.info("Node is in sync, enabling transaction handling");
|
||||
enableTransactionHandling(
|
||||
transactionTracker,
|
||||
transactionPool,
|
||||
transactionsMessageHandler,
|
||||
pooledTransactionsMessageHandler);
|
||||
} else {
|
||||
LOG.info("Node out of sync, disabling transaction handling");
|
||||
disableTransactionHandling(
|
||||
transactionPool, transactionsMessageHandler, pooledTransactionsMessageHandler);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return transactionPool;
|
||||
}
|
||||
|
||||
private static void enableTransactionHandling(
|
||||
final PeerTransactionTracker transactionTracker,
|
||||
final TransactionPool transactionPool,
|
||||
final TransactionsMessageHandler transactionsMessageHandler,
|
||||
final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler) {
|
||||
transactionTracker.reset();
|
||||
transactionPool.setEnabled();
|
||||
transactionsMessageHandler.setEnabled();
|
||||
pooledTransactionsMessageHandler.setEnabled();
|
||||
}
|
||||
|
||||
private static void disableTransactionHandling(
|
||||
final TransactionPool transactionPool,
|
||||
final TransactionsMessageHandler transactionsMessageHandler,
|
||||
final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler) {
|
||||
transactionPool.setDisabled();
|
||||
transactionsMessageHandler.setDisabled();
|
||||
pooledTransactionsMessageHandler.setDisabled();
|
||||
}
|
||||
|
||||
private static void subscribeTransactionHandlers(
|
||||
final ProtocolContext protocolContext,
|
||||
final EthContext ethContext,
|
||||
|
||||
@@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.transactions;
|
||||
|
||||
import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason;
|
||||
import org.hyperledger.besu.metrics.BesuMetricCategory;
|
||||
import org.hyperledger.besu.metrics.ReplaceableDoubleSupplier;
|
||||
import org.hyperledger.besu.metrics.RunnableCounter;
|
||||
import org.hyperledger.besu.plugin.services.MetricsSystem;
|
||||
import org.hyperledger.besu.plugin.services.metrics.Counter;
|
||||
@@ -46,6 +47,9 @@ public class TransactionPoolMetrics {
|
||||
private final LabelledMetric<Counter> expiredMessagesCounter;
|
||||
private final Map<String, RunnableCounter> expiredMessagesRunnableCounters = new HashMap<>();
|
||||
private final LabelledMetric<Counter> alreadySeenTransactionsCounter;
|
||||
private final Map<String, ReplaceableDoubleSupplier> spaceUsedSuppliers = new HashMap<>();
|
||||
private final Map<String, ReplaceableDoubleSupplier> transactionCountSuppliers = new HashMap<>();
|
||||
private final Map<String, ReplaceableDoubleSupplier> uniqueSendersSuppliers = new HashMap<>();
|
||||
|
||||
public TransactionPoolMetrics(final MetricsSystem metricsSystem) {
|
||||
this.metricsSystem = metricsSystem;
|
||||
@@ -120,17 +124,44 @@ public class TransactionPoolMetrics {
|
||||
}
|
||||
|
||||
public void initSpaceUsed(final DoubleSupplier spaceUsedSupplier, final String layer) {
|
||||
spaceUsed.labels(spaceUsedSupplier, layer);
|
||||
spaceUsedSuppliers.compute(
|
||||
layer,
|
||||
(unused, existingSupplier) -> {
|
||||
if (existingSupplier == null) {
|
||||
final var newSupplier = new ReplaceableDoubleSupplier(spaceUsedSupplier);
|
||||
spaceUsed.labels(newSupplier, layer);
|
||||
return newSupplier;
|
||||
}
|
||||
return existingSupplier.replaceDoubleSupplier(spaceUsedSupplier);
|
||||
});
|
||||
}
|
||||
|
||||
public void initTransactionCount(
|
||||
final DoubleSupplier transactionCountSupplier, final String layer) {
|
||||
transactionCount.labels(transactionCountSupplier, layer);
|
||||
transactionCountSuppliers.compute(
|
||||
layer,
|
||||
(unused, existingSupplier) -> {
|
||||
if (existingSupplier == null) {
|
||||
final var newSupplier = new ReplaceableDoubleSupplier(transactionCountSupplier);
|
||||
transactionCount.labels(newSupplier, layer);
|
||||
return newSupplier;
|
||||
}
|
||||
return existingSupplier.replaceDoubleSupplier(transactionCountSupplier);
|
||||
});
|
||||
}
|
||||
|
||||
public void initUniqueSenderCount(
|
||||
final DoubleSupplier uniqueSenderCountSupplier, final String layer) {
|
||||
uniqueSenderCount.labels(uniqueSenderCountSupplier, layer);
|
||||
uniqueSendersSuppliers.compute(
|
||||
layer,
|
||||
(unused, existingSupplier) -> {
|
||||
if (existingSupplier == null) {
|
||||
final var newSupplier = new ReplaceableDoubleSupplier(uniqueSenderCountSupplier);
|
||||
uniqueSenderCount.labels(newSupplier, layer);
|
||||
return newSupplier;
|
||||
}
|
||||
return existingSupplier.replaceDoubleSupplier(uniqueSenderCountSupplier);
|
||||
});
|
||||
}
|
||||
|
||||
public void initExpiredMessagesCounter(final String message) {
|
||||
|
||||
@@ -21,16 +21,24 @@ import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
|
||||
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
|
||||
import org.hyperledger.besu.testutil.MockExecutorService;
|
||||
import org.hyperledger.besu.testutil.MockScheduledExecutor;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.awaitility.Awaitility;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -209,4 +217,43 @@ public class EthSchedulerTest {
|
||||
assertThat(task.isFailed()).isTrue();
|
||||
assertThat(task.isCancelled()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void itemsSubmittedToOrderedProcessorAreProcessedInOrder() throws InterruptedException {
|
||||
final int numOfItems = 100;
|
||||
final Random random = new Random();
|
||||
final EthScheduler realEthScheduler = new EthScheduler(1, 1, 1, new NoOpMetricsSystem());
|
||||
|
||||
final List<String> processedStrings = new CopyOnWriteArrayList<>();
|
||||
|
||||
final Consumer<String> stringProcessor =
|
||||
s -> {
|
||||
processedStrings.add(s);
|
||||
try {
|
||||
Thread.sleep(random.nextInt(20));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
|
||||
final var orderProcessor = realEthScheduler.createOrderedProcessor(stringProcessor);
|
||||
IntStream.range(0, numOfItems)
|
||||
.mapToObj(String::valueOf)
|
||||
.forEach(
|
||||
s -> {
|
||||
orderProcessor.submit(s);
|
||||
try {
|
||||
Thread.sleep(random.nextInt(20));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
final List<String> expectedStrings = new ArrayList<>(numOfItems);
|
||||
IntStream.range(0, numOfItems).mapToObj(String::valueOf).forEach(expectedStrings::add);
|
||||
|
||||
Awaitility.await().until(() -> processedStrings.size() == numOfItems);
|
||||
|
||||
assertThat(processedStrings).containsExactlyElementsOf(expectedStrings);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,7 +166,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
|
||||
RespondingEthPeer.blockchainResponder(
|
||||
blockchain, protocolContext.getWorldStateArchive(), transactionPool);
|
||||
final RespondingEthPeer respondingPeer =
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32);
|
||||
|
||||
// Setup data to be requested and expected response
|
||||
final T requestedData = generateDataToBeRequested();
|
||||
@@ -190,7 +190,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
|
||||
@Test
|
||||
public void doesNotCompleteWhenPeersDoNotRespond() {
|
||||
// Setup a unresponsive peer
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32);
|
||||
|
||||
// Setup data to be requested
|
||||
final T requestedData = generateDataToBeRequested();
|
||||
@@ -209,7 +209,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
|
||||
@Test
|
||||
public void cancel() {
|
||||
// Setup a unresponsive peer
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32);
|
||||
|
||||
// Setup data to be requested
|
||||
final T requestedData = generateDataToBeRequested();
|
||||
|
||||
@@ -58,7 +58,7 @@ public abstract class PeerMessageTaskTest<T>
|
||||
protocolSchedule,
|
||||
0.5f);
|
||||
final RespondingEthPeer respondingEthPeer =
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32);
|
||||
|
||||
// Execute task and wait for response
|
||||
final AtomicReference<T> actualResult = new AtomicReference<>();
|
||||
@@ -109,7 +109,7 @@ public abstract class PeerMessageTaskTest<T>
|
||||
// Setup a unresponsive peer
|
||||
final RespondingEthPeer.Responder responder = RespondingEthPeer.emptyResponder();
|
||||
final RespondingEthPeer respondingEthPeer =
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32);
|
||||
|
||||
// Setup data to be requested
|
||||
final T requestedData = generateDataToBeRequested();
|
||||
@@ -129,7 +129,7 @@ public abstract class PeerMessageTaskTest<T>
|
||||
peersDoTimeout.set(true);
|
||||
// Setup a unresponsive peer
|
||||
final RespondingEthPeer respondingEthPeer =
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
|
||||
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32);
|
||||
|
||||
// Setup data to be requested
|
||||
final T requestedData = generateDataToBeRequested();
|
||||
|
||||
@@ -32,7 +32,6 @@ import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.nullable;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
@@ -100,7 +99,6 @@ import java.util.Set;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
@@ -233,12 +231,9 @@ public abstract class AbstractTransactionPoolTest {
|
||||
ethProtocolManager = EthProtocolManagerTestUtil.create();
|
||||
ethContext = spy(ethProtocolManager.ethContext());
|
||||
|
||||
final EthScheduler ethScheduler = mock(EthScheduler.class);
|
||||
final EthScheduler ethScheduler = spy(ethContext.getScheduler());
|
||||
syncTaskCapture = ArgumentCaptor.forClass(Runnable.class);
|
||||
doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture());
|
||||
doAnswer(invocation -> ((Supplier<Void>) invocation.getArguments()[0]).get())
|
||||
.when(ethScheduler)
|
||||
.scheduleServiceTask(any(Supplier.class));
|
||||
doReturn(ethScheduler).when(ethContext).getScheduler();
|
||||
|
||||
peerTransactionTracker = new PeerTransactionTracker();
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Copyright Hyperledger Besu Contributors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
package org.hyperledger.besu.metrics;
|
||||
|
||||
import java.util.function.DoubleSupplier;
|
||||
|
||||
/**
|
||||
* This class provides a replaceable double supplier. It allows to replace the current double
|
||||
* supplier with a new one.
|
||||
*/
|
||||
public class ReplaceableDoubleSupplier implements DoubleSupplier {
|
||||
private DoubleSupplier currentSupplier;
|
||||
|
||||
/**
|
||||
* Constructs a new ReplaceableDoubleSupplier with the given initial supplier.
|
||||
*
|
||||
* @param currentSupplier the initial double supplier
|
||||
*/
|
||||
public ReplaceableDoubleSupplier(final DoubleSupplier currentSupplier) {
|
||||
this.currentSupplier = currentSupplier;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a double value from the current supplier.
|
||||
*
|
||||
* @return the double value supplied by the current supplier
|
||||
*/
|
||||
@Override
|
||||
public double getAsDouble() {
|
||||
return currentSupplier.getAsDouble();
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces the current double supplier with a new one.
|
||||
*
|
||||
* @param newSupplier the new double supplier
|
||||
* @return this ReplaceableDoubleSupplier
|
||||
*/
|
||||
public ReplaceableDoubleSupplier replaceDoubleSupplier(final DoubleSupplier newSupplier) {
|
||||
currentSupplier = newSupplier;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright Hyperledger Besu Contributors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
package org.hyperledger.besu.metrics;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class ReplaceableDoubleSupplierTest {
|
||||
|
||||
@Test
|
||||
public void shouldWorkAsNormalSupplier() {
|
||||
final var rds = new ReplaceableDoubleSupplier(() -> 1.0);
|
||||
assertThat(rds.getAsDouble()).isEqualTo(1.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReturnValueFromNewSupplierIfReplaced() {
|
||||
final var rds = new ReplaceableDoubleSupplier(() -> 1.0);
|
||||
assertThat(rds.getAsDouble()).isEqualTo(1.0);
|
||||
rds.replaceDoubleSupplier(() -> 2.0);
|
||||
assertThat(rds.getAsDouble()).isEqualTo(2.0);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user