mirror of
https://github.com/vacp2p/linea-besu.git
synced 2026-01-09 15:37:54 -05:00
8053: Add GetPooledTransactionsFromPeerTask and appropriate usage (#8058)
* 8053: Add GetPooledTransactionsFromPeerTask and appropriate usage Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net> * 7582: Loosen restriction to allow fewer or equal results than requested, but not more Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net> * 8053: Protect against null message response in peer task executor Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net> * 8053: spotless Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net> --------- Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
This commit is contained in:
@@ -717,7 +717,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
|
||||
syncState,
|
||||
transactionPoolConfiguration,
|
||||
besuComponent.map(BesuComponent::getBlobCache).orElse(new BlobCache()),
|
||||
miningConfiguration);
|
||||
miningConfiguration,
|
||||
syncConfig.isPeerTaskSystemEnabled());
|
||||
|
||||
final List<PeerValidator> peerValidators =
|
||||
createPeerValidators(protocolSchedule, peerTaskExecutor);
|
||||
|
||||
@@ -173,7 +173,8 @@ public class BesuEventsImplTest {
|
||||
syncState,
|
||||
txPoolConfig,
|
||||
new BlobCache(),
|
||||
MiningConfiguration.newDefault());
|
||||
MiningConfiguration.newDefault(),
|
||||
false);
|
||||
|
||||
serviceImpl =
|
||||
new BesuEventsImpl(
|
||||
|
||||
@@ -136,6 +136,10 @@ public class PeerTaskExecutor {
|
||||
MessageData responseMessageData =
|
||||
requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer);
|
||||
|
||||
if (responseMessageData == null) {
|
||||
throw new InvalidPeerTaskResponseException();
|
||||
}
|
||||
|
||||
result = peerTask.processResponse(responseMessageData);
|
||||
} finally {
|
||||
inflightRequestCountForThisTaskClass.decrementAndGet();
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
/*
|
||||
* Copyright contributors to Besu.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;
|
||||
|
||||
import org.hyperledger.besu.datatypes.Hash;
|
||||
import org.hyperledger.besu.ethereum.core.Transaction;
|
||||
import org.hyperledger.besu.ethereum.eth.EthProtocol;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskValidationResponse;
|
||||
import org.hyperledger.besu.ethereum.eth.messages.GetPooledTransactionsMessage;
|
||||
import org.hyperledger.besu.ethereum.eth.messages.PooledTransactionsMessage;
|
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
|
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class GetPooledTransactionsFromPeerTask implements PeerTask<List<Transaction>> {
|
||||
|
||||
private final List<Hash> hashes;
|
||||
|
||||
public GetPooledTransactionsFromPeerTask(final List<Hash> hashes) {
|
||||
this.hashes = hashes.stream().distinct().toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubProtocol getSubProtocol() {
|
||||
return EthProtocol.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageData getRequestMessage() {
|
||||
return GetPooledTransactionsMessage.create(hashes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Transaction> processResponse(final MessageData messageData)
|
||||
throws InvalidPeerTaskResponseException {
|
||||
final PooledTransactionsMessage pooledTransactionsMessage =
|
||||
PooledTransactionsMessage.readFrom(messageData);
|
||||
final List<Transaction> responseTransactions = pooledTransactionsMessage.transactions();
|
||||
if (responseTransactions.size() > hashes.size()) {
|
||||
throw new InvalidPeerTaskResponseException(
|
||||
"Response transaction count does not match request hash count");
|
||||
}
|
||||
return responseTransactions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Predicate<EthPeer> getPeerRequirementFilter() {
|
||||
return (peer) -> true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerTaskValidationResponse validateResult(final List<Transaction> result) {
|
||||
if (!result.stream().allMatch((t) -> hashes.contains(t.getHash()))) {
|
||||
return PeerTaskValidationResponse.RESULTS_DO_NOT_MATCH_QUERY;
|
||||
}
|
||||
return PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD;
|
||||
}
|
||||
|
||||
public List<Hash> getHashes() {
|
||||
return hashes;
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,8 @@ import org.hyperledger.besu.datatypes.Hash;
|
||||
import org.hyperledger.besu.ethereum.core.Transaction;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
|
||||
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
|
||||
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
|
||||
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
|
||||
@@ -28,6 +30,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
import com.google.common.collect.EvictingQueue;
|
||||
@@ -49,6 +52,7 @@ public class BufferedGetPooledTransactionsFromPeerFetcher {
|
||||
private final ScheduledFuture<?> scheduledFuture;
|
||||
private final EthPeer peer;
|
||||
private final Queue<Hash> txAnnounces;
|
||||
private final boolean isPeerTaskSystemEnabled;
|
||||
|
||||
public BufferedGetPooledTransactionsFromPeerFetcher(
|
||||
final EthContext ethContext,
|
||||
@@ -57,7 +61,8 @@ public class BufferedGetPooledTransactionsFromPeerFetcher {
|
||||
final TransactionPool transactionPool,
|
||||
final PeerTransactionTracker transactionTracker,
|
||||
final TransactionPoolMetrics metrics,
|
||||
final String metricLabel) {
|
||||
final String metricLabel,
|
||||
final boolean isPeerTaskSystemEnabled) {
|
||||
this.ethContext = ethContext;
|
||||
this.scheduledFuture = scheduledFuture;
|
||||
this.peer = peer;
|
||||
@@ -67,6 +72,7 @@ public class BufferedGetPooledTransactionsFromPeerFetcher {
|
||||
this.metricLabel = metricLabel;
|
||||
this.txAnnounces =
|
||||
Queues.synchronizedQueue(EvictingQueue.create(DEFAULT_MAX_PENDING_TRANSACTIONS));
|
||||
this.isPeerTaskSystemEnabled = isPeerTaskSystemEnabled;
|
||||
}
|
||||
|
||||
public ScheduledFuture<?> getScheduledFuture() {
|
||||
@@ -76,22 +82,48 @@ public class BufferedGetPooledTransactionsFromPeerFetcher {
|
||||
public void requestTransactions() {
|
||||
List<Hash> txHashesAnnounced;
|
||||
while (!(txHashesAnnounced = getTxHashesAnnounced()).isEmpty()) {
|
||||
CompletableFuture<List<Transaction>> futureTransactions;
|
||||
if (isPeerTaskSystemEnabled) {
|
||||
final org.hyperledger.besu.ethereum.eth.manager.peertask.task
|
||||
.GetPooledTransactionsFromPeerTask
|
||||
task =
|
||||
new org.hyperledger.besu.ethereum.eth.manager.peertask.task
|
||||
.GetPooledTransactionsFromPeerTask(txHashesAnnounced);
|
||||
futureTransactions =
|
||||
ethContext
|
||||
.getScheduler()
|
||||
.scheduleSyncWorkerTask(
|
||||
() -> {
|
||||
PeerTaskExecutorResult<List<Transaction>> taskResult =
|
||||
ethContext.getPeerTaskExecutor().executeAgainstPeer(task, peer);
|
||||
if (taskResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS
|
||||
|| taskResult.result().isEmpty()) {
|
||||
return CompletableFuture.failedFuture(
|
||||
new RuntimeException("Failed to retrieve transactions for hashes"));
|
||||
}
|
||||
return CompletableFuture.completedFuture(taskResult.result().get());
|
||||
});
|
||||
} else {
|
||||
final GetPooledTransactionsFromPeerTask task =
|
||||
GetPooledTransactionsFromPeerTask.forHashes(
|
||||
ethContext, txHashesAnnounced, metrics.getMetricsSystem());
|
||||
task.assignPeer(peer);
|
||||
futureTransactions =
|
||||
ethContext
|
||||
.getScheduler()
|
||||
.scheduleSyncWorkerTask(task)
|
||||
.thenAccept(
|
||||
result -> {
|
||||
List<Transaction> retrievedTransactions = result.getResult();
|
||||
.thenCompose(
|
||||
(peerTaskResult) ->
|
||||
CompletableFuture.completedFuture(peerTaskResult.getResult()));
|
||||
}
|
||||
|
||||
futureTransactions.thenAccept(
|
||||
retrievedTransactions -> {
|
||||
transactionTracker.markTransactionsAsSeen(peer, retrievedTransactions);
|
||||
|
||||
LOG.atTrace()
|
||||
.setMessage("Got {} transactions of {} hashes requested from peer {}")
|
||||
.setMessage("Got {} transactions requested from peer {}")
|
||||
.addArgument(retrievedTransactions::size)
|
||||
.addArgument(task.getTransactionHashes()::size)
|
||||
.addArgument(peer::getLoggableId)
|
||||
.log();
|
||||
|
||||
|
||||
@@ -48,13 +48,15 @@ public class NewPooledTransactionHashesMessageProcessor {
|
||||
private final TransactionPoolConfiguration transactionPoolConfiguration;
|
||||
private final EthContext ethContext;
|
||||
private final TransactionPoolMetrics metrics;
|
||||
private final boolean isPeerTaskSystemEnabled;
|
||||
|
||||
public NewPooledTransactionHashesMessageProcessor(
|
||||
final PeerTransactionTracker transactionTracker,
|
||||
final TransactionPool transactionPool,
|
||||
final TransactionPoolConfiguration transactionPoolConfiguration,
|
||||
final EthContext ethContext,
|
||||
final TransactionPoolMetrics metrics) {
|
||||
final TransactionPoolMetrics metrics,
|
||||
final boolean isPeerTaskSystemEnabled) {
|
||||
this.transactionTracker = transactionTracker;
|
||||
this.transactionPool = transactionPool;
|
||||
this.transactionPoolConfiguration = transactionPoolConfiguration;
|
||||
@@ -62,6 +64,7 @@ public class NewPooledTransactionHashesMessageProcessor {
|
||||
this.metrics = metrics;
|
||||
metrics.initExpiredMessagesCounter(METRIC_LABEL);
|
||||
this.scheduledTasks = new ConcurrentHashMap<>();
|
||||
this.isPeerTaskSystemEnabled = isPeerTaskSystemEnabled;
|
||||
}
|
||||
|
||||
void processNewPooledTransactionHashesMessage(
|
||||
@@ -114,7 +117,8 @@ public class NewPooledTransactionHashesMessageProcessor {
|
||||
transactionPool,
|
||||
transactionTracker,
|
||||
metrics,
|
||||
METRIC_LABEL);
|
||||
METRIC_LABEL,
|
||||
isPeerTaskSystemEnabled);
|
||||
});
|
||||
|
||||
bufferedTask.addHashes(
|
||||
|
||||
@@ -56,7 +56,8 @@ public class TransactionPoolFactory {
|
||||
final SyncState syncState,
|
||||
final TransactionPoolConfiguration transactionPoolConfiguration,
|
||||
final BlobCache blobCache,
|
||||
final MiningConfiguration miningConfiguration) {
|
||||
final MiningConfiguration miningConfiguration,
|
||||
final boolean isPeerTaskSystemEnabled) {
|
||||
|
||||
final TransactionPoolMetrics metrics = new TransactionPoolMetrics(metricsSystem);
|
||||
|
||||
@@ -80,7 +81,8 @@ public class TransactionPoolFactory {
|
||||
transactionsMessageSender,
|
||||
newPooledTransactionHashesMessageSender,
|
||||
blobCache,
|
||||
miningConfiguration);
|
||||
miningConfiguration,
|
||||
isPeerTaskSystemEnabled);
|
||||
}
|
||||
|
||||
static TransactionPool createTransactionPool(
|
||||
@@ -95,7 +97,8 @@ public class TransactionPoolFactory {
|
||||
final TransactionsMessageSender transactionsMessageSender,
|
||||
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender,
|
||||
final BlobCache blobCache,
|
||||
final MiningConfiguration miningConfiguration) {
|
||||
final MiningConfiguration miningConfiguration,
|
||||
final boolean isPeerTaskSystemEnabled) {
|
||||
|
||||
final TransactionPool transactionPool =
|
||||
new TransactionPool(
|
||||
@@ -135,7 +138,8 @@ public class TransactionPoolFactory {
|
||||
transactionPool,
|
||||
transactionPoolConfiguration,
|
||||
ethContext,
|
||||
metrics),
|
||||
metrics,
|
||||
isPeerTaskSystemEnabled),
|
||||
transactionPoolConfiguration.getUnstable().getTxMessageKeepAliveSeconds());
|
||||
|
||||
subscribeTransactionHandlers(
|
||||
|
||||
@@ -1149,7 +1149,8 @@ public final class EthProtocolManagerTest {
|
||||
new SyncState(blockchain, ethManager.ethContext().getEthPeers()),
|
||||
TransactionPoolConfiguration.DEFAULT,
|
||||
new BlobCache(),
|
||||
MiningConfiguration.newDefault())
|
||||
MiningConfiguration.newDefault(),
|
||||
false)
|
||||
.setEnabled();
|
||||
|
||||
// Send just a transaction message.
|
||||
|
||||
@@ -147,7 +147,8 @@ public abstract class AbstractMessageTaskTest<T, R> {
|
||||
syncState,
|
||||
TransactionPoolConfiguration.DEFAULT,
|
||||
new BlobCache(),
|
||||
MiningConfiguration.newDefault());
|
||||
MiningConfiguration.newDefault(),
|
||||
false);
|
||||
transactionPool.setEnabled();
|
||||
|
||||
ethProtocolManager =
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
/*
|
||||
* Copyright contributors to Besu.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;
|
||||
|
||||
import org.hyperledger.besu.datatypes.Hash;
|
||||
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
|
||||
import org.hyperledger.besu.ethereum.core.Transaction;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskValidationResponse;
|
||||
import org.hyperledger.besu.ethereum.eth.messages.PooledTransactionsMessage;
|
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class GetPooledTransactionsFromPeerTaskTest {
|
||||
private static final BlockDataGenerator GENERATOR = new BlockDataGenerator();
|
||||
|
||||
@Test
|
||||
public void testGetRequestMessage() {
|
||||
List<Hash> hashes = List.of(Hash.EMPTY);
|
||||
GetPooledTransactionsFromPeerTask task = new GetPooledTransactionsFromPeerTask(hashes);
|
||||
|
||||
MessageData result = task.getRequestMessage();
|
||||
|
||||
Assertions.assertEquals(
|
||||
"0xe1a0c5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470",
|
||||
result.getData().toHexString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProcessResponse() throws InvalidPeerTaskResponseException {
|
||||
List<Hash> hashes = List.of(Hash.EMPTY);
|
||||
GetPooledTransactionsFromPeerTask task = new GetPooledTransactionsFromPeerTask(hashes);
|
||||
|
||||
Transaction transaction = GENERATOR.transaction();
|
||||
PooledTransactionsMessage pooledTransactionsMessage =
|
||||
PooledTransactionsMessage.create(List.of(transaction));
|
||||
|
||||
List<Transaction> result = task.processResponse(pooledTransactionsMessage);
|
||||
|
||||
Assertions.assertEquals(List.of(transaction), result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProcessResponseWithIncorrectTransactionCount() {
|
||||
List<Hash> hashes = List.of(Hash.EMPTY);
|
||||
GetPooledTransactionsFromPeerTask task = new GetPooledTransactionsFromPeerTask(hashes);
|
||||
|
||||
PooledTransactionsMessage pooledTransactionsMessage =
|
||||
PooledTransactionsMessage.create(List.of(GENERATOR.transaction(), GENERATOR.transaction()));
|
||||
|
||||
InvalidPeerTaskResponseException exception =
|
||||
Assertions.assertThrows(
|
||||
InvalidPeerTaskResponseException.class,
|
||||
() -> task.processResponse(pooledTransactionsMessage));
|
||||
|
||||
Assertions.assertEquals(
|
||||
"Response transaction count does not match request hash count", exception.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateResult() {
|
||||
List<Hash> hashes = List.of(Hash.EMPTY);
|
||||
GetPooledTransactionsFromPeerTask task = new GetPooledTransactionsFromPeerTask(hashes);
|
||||
|
||||
Transaction transaction = Mockito.mock(Transaction.class);
|
||||
Mockito.when(transaction.getHash()).thenReturn(Hash.EMPTY);
|
||||
|
||||
PeerTaskValidationResponse validationResponse = task.validateResult(List.of(transaction));
|
||||
Assertions.assertEquals(PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD, validationResponse);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateResultWithMismatchedResults() {
|
||||
List<Hash> hashes = List.of(Hash.EMPTY);
|
||||
GetPooledTransactionsFromPeerTask task = new GetPooledTransactionsFromPeerTask(hashes);
|
||||
|
||||
Transaction transaction = Mockito.mock(Transaction.class);
|
||||
Mockito.when(transaction.getHash()).thenReturn(Hash.EMPTY_TRIE_HASH);
|
||||
|
||||
PeerTaskValidationResponse validationResponse = task.validateResult(List.of(transaction));
|
||||
Assertions.assertEquals(
|
||||
PeerTaskValidationResponse.RESULTS_DO_NOT_MATCH_QUERY, validationResponse);
|
||||
}
|
||||
}
|
||||
@@ -81,7 +81,8 @@ public class BufferedGetPooledTransactionsFromPeerFetcherTest {
|
||||
transactionPool,
|
||||
transactionTracker,
|
||||
new TransactionPoolMetrics(metricsSystem),
|
||||
"new_pooled_transaction_hashes");
|
||||
"new_pooled_transaction_hashes",
|
||||
false);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -0,0 +1,184 @@
|
||||
/*
|
||||
* Copyright contributors to Besu.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
package org.hyperledger.besu.ethereum.eth.manager.task;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoInteractions;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.hyperledger.besu.datatypes.Hash;
|
||||
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
|
||||
import org.hyperledger.besu.ethereum.core.Transaction;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
|
||||
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
|
||||
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
|
||||
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
|
||||
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
|
||||
import org.hyperledger.besu.metrics.StubMetricsSystem;
|
||||
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import io.netty.util.concurrent.ScheduledFuture;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@MockitoSettings(strictness = Strictness.LENIENT)
|
||||
public class BufferedGetPooledTransactionsFromPeerFetcherUsingPeerTaskTest {
|
||||
|
||||
private @Mock EthPeer ethPeer;
|
||||
private @Mock TransactionPool transactionPool;
|
||||
private @Mock EthContext ethContext;
|
||||
private @Mock EthPeers ethPeers;
|
||||
private @Mock PeerTaskExecutor peerTaskExecutor;
|
||||
|
||||
private final BlockDataGenerator generator = new BlockDataGenerator();
|
||||
private final EthScheduler ethScheduler = new DeterministicEthScheduler();
|
||||
|
||||
private BufferedGetPooledTransactionsFromPeerFetcher fetcher;
|
||||
private StubMetricsSystem metricsSystem;
|
||||
private PeerTransactionTracker transactionTracker;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
metricsSystem = new StubMetricsSystem();
|
||||
when(ethContext.getEthPeers()).thenReturn(ethPeers);
|
||||
transactionTracker = new PeerTransactionTracker(ethPeers);
|
||||
when(ethContext.getScheduler()).thenReturn(ethScheduler);
|
||||
when(ethContext.getPeerTaskExecutor()).thenReturn(peerTaskExecutor);
|
||||
ScheduledFuture<?> mock = mock(ScheduledFuture.class);
|
||||
fetcher =
|
||||
new BufferedGetPooledTransactionsFromPeerFetcher(
|
||||
ethContext,
|
||||
mock,
|
||||
ethPeer,
|
||||
transactionPool,
|
||||
transactionTracker,
|
||||
new TransactionPoolMetrics(metricsSystem),
|
||||
"new_pooled_transaction_hashes",
|
||||
true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void requestTransactionShouldStartTaskWhenUnknownTransaction() {
|
||||
final Transaction transaction = generator.transaction();
|
||||
final List<Transaction> taskResult = List.of(transaction);
|
||||
final PeerTaskExecutorResult<List<Transaction>> peerTaskResult =
|
||||
new PeerTaskExecutorResult<List<Transaction>>(
|
||||
Optional.of(taskResult), PeerTaskExecutorResponseCode.SUCCESS, Optional.of(ethPeer));
|
||||
|
||||
when(peerTaskExecutor.executeAgainstPeer(
|
||||
any(
|
||||
org.hyperledger.besu.ethereum.eth.manager.peertask.task
|
||||
.GetPooledTransactionsFromPeerTask.class),
|
||||
eq(ethPeer)))
|
||||
.thenReturn(peerTaskResult);
|
||||
|
||||
fetcher.addHashes(List.of(transaction.getHash()));
|
||||
fetcher.requestTransactions();
|
||||
|
||||
verify(peerTaskExecutor)
|
||||
.executeAgainstPeer(
|
||||
any(
|
||||
org.hyperledger.besu.ethereum.eth.manager.peertask.task
|
||||
.GetPooledTransactionsFromPeerTask.class),
|
||||
eq(ethPeer));
|
||||
verifyNoMoreInteractions(peerTaskExecutor);
|
||||
verify(transactionPool, times(1)).addRemoteTransactions(taskResult);
|
||||
|
||||
assertThat(transactionTracker.hasSeenTransaction(transaction.getHash())).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void requestTransactionShouldSplitRequestIntoSeveralTasks() {
|
||||
final Map<Hash, Transaction> transactionsByHash =
|
||||
IntStream.range(0, 257)
|
||||
.mapToObj(unused -> generator.transaction())
|
||||
.collect(Collectors.toMap((t) -> t.getHash(), (t) -> t));
|
||||
fetcher.addHashes(transactionsByHash.keySet());
|
||||
|
||||
when(peerTaskExecutor.executeAgainstPeer(
|
||||
any(
|
||||
org.hyperledger.besu.ethereum.eth.manager.peertask.task
|
||||
.GetPooledTransactionsFromPeerTask.class),
|
||||
eq(ethPeer)))
|
||||
.thenAnswer(
|
||||
(invocationOnMock) -> {
|
||||
org.hyperledger.besu.ethereum.eth.manager.peertask.task
|
||||
.GetPooledTransactionsFromPeerTask
|
||||
task =
|
||||
invocationOnMock.getArgument(
|
||||
0,
|
||||
org.hyperledger.besu.ethereum.eth.manager.peertask.task
|
||||
.GetPooledTransactionsFromPeerTask.class);
|
||||
List<Transaction> resultTransactions =
|
||||
task.getHashes().stream().map(transactionsByHash::get).toList();
|
||||
return new PeerTaskExecutorResult<List<Transaction>>(
|
||||
Optional.of(resultTransactions),
|
||||
PeerTaskExecutorResponseCode.SUCCESS,
|
||||
Optional.of(ethPeer));
|
||||
});
|
||||
|
||||
fetcher.requestTransactions();
|
||||
|
||||
verify(peerTaskExecutor, times(2))
|
||||
.executeAgainstPeer(
|
||||
any(
|
||||
org.hyperledger.besu.ethereum.eth.manager.peertask.task
|
||||
.GetPooledTransactionsFromPeerTask.class),
|
||||
eq(ethPeer));
|
||||
verifyNoMoreInteractions(peerTaskExecutor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void requestTransactionShouldNotStartTaskWhenTransactionAlreadySeen() {
|
||||
|
||||
final Transaction transaction = generator.transaction();
|
||||
final Hash hash = transaction.getHash();
|
||||
transactionTracker.markTransactionHashesAsSeen(ethPeer, List.of(hash));
|
||||
|
||||
fetcher.addHashes(List.of(hash));
|
||||
fetcher.requestTransactions();
|
||||
|
||||
verifyNoInteractions(peerTaskExecutor);
|
||||
verify(transactionPool, never()).addRemoteTransactions(List.of(transaction));
|
||||
assertThat(
|
||||
metricsSystem.getCounterValue(
|
||||
"remote_transactions_already_seen_total", "new_pooled_transaction_hashes"))
|
||||
.isEqualTo(1);
|
||||
}
|
||||
}
|
||||
@@ -100,7 +100,8 @@ class NewPooledTransactionHashesMessageProcessorTest {
|
||||
transactionPool,
|
||||
transactionPoolConfiguration,
|
||||
ethContext,
|
||||
new TransactionPoolMetrics(metricsSystem));
|
||||
new TransactionPoolMetrics(metricsSystem),
|
||||
false);
|
||||
when(ethContext.getScheduler()).thenReturn(ethScheduler);
|
||||
}
|
||||
|
||||
|
||||
@@ -182,7 +182,8 @@ public class TestNode implements Closeable {
|
||||
syncState,
|
||||
TransactionPoolConfiguration.DEFAULT,
|
||||
new BlobCache(),
|
||||
MiningConfiguration.newDefault());
|
||||
MiningConfiguration.newDefault(),
|
||||
false);
|
||||
|
||||
final EthProtocolManager ethProtocolManager =
|
||||
new EthProtocolManager(
|
||||
|
||||
@@ -412,7 +412,8 @@ public class TransactionPoolFactoryTest {
|
||||
transactionsMessageSender,
|
||||
newPooledTransactionHashesMessageSender,
|
||||
new BlobCache(),
|
||||
MiningConfiguration.newDefault());
|
||||
MiningConfiguration.newDefault(),
|
||||
false);
|
||||
}
|
||||
|
||||
private TransactionPool createAndEnableTransactionPool(
|
||||
|
||||
Reference in New Issue
Block a user