Merge branch 'main' into zkbesu

This commit is contained in:
Fabio Di Fabio
2024-10-18 10:32:33 +02:00
44 changed files with 1190 additions and 122 deletions

View File

@@ -1,12 +1,14 @@
# Changelog
## [Unreleased]
- Added isLabelsObserved to LabelledGauge in plugin-api. Default implementation returns false.
### Breaking Changes
### Upcoming Breaking Changes
### Additions and Improvements
- Fine tune already seen txs tracker when a tx is removed from the pool [#7755](https://github.com/hyperledger/besu/pull/7755)
### Bug fixes

View File

@@ -133,7 +133,7 @@ public class BesuEventsImpl implements BesuEvents {
public long addTransactionDroppedListener(
final TransactionDroppedListener transactionDroppedListener) {
return transactionPool.subscribeDroppedTransactions(
transactionDroppedListener::onTransactionDropped);
(transaction, reason) -> transactionDroppedListener.onTransactionDropped(transaction));
}
@Override

View File

@@ -20,6 +20,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.Subscrip
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.RemovalReason;
import java.util.List;
@@ -34,7 +35,7 @@ public class PendingTransactionDroppedSubscriptionService
}
@Override
public void onTransactionDropped(final Transaction transaction) {
public void onTransactionDropped(final Transaction transaction, final RemovalReason reason) {
notifySubscribers(transaction.getHash());
}

View File

@@ -157,12 +157,12 @@ public class EthCallTest {
when(result.isSuccessful()).thenReturn(true);
when(result.getValidationResult()).thenReturn(ValidationResult.valid());
when(result.getOutput()).thenReturn(Bytes.of(1));
verify(transactionSimulator).process(any(), any(), any(), mapperCaptor.capture(), any());
verify(transactionSimulator)
.process(eq(callParameter()), any(), any(), mapperCaptor.capture(), any());
assertThat(mapperCaptor.getValue().apply(mock(MutableWorldState.class), Optional.of(result)))
.isEqualTo(Optional.of(expectedResponse));
assertThat(response).usingRecursiveComparison().isEqualTo(expectedResponse);
verify(transactionSimulator).process(eq(callParameter()), any(), any(), any(), any());
verify(blockchainQueries, atLeastOnce()).getBlockchain();
verifyNoMoreInteractions(blockchainQueries);
}
@@ -174,10 +174,9 @@ public class EthCallTest {
// Expect a revert error with no decoded reason (error doesn't begin "Error(string)" so ignored)
final String abiHexString = "0x1234";
final JsonRpcError expectedError = new JsonRpcError(REVERT_ERROR, abiHexString);
final JsonRpcResponse expectedResponse = new JsonRpcErrorResponse(null, expectedError);
final JsonRpcErrorResponse expectedResponse = new JsonRpcErrorResponse(null, expectedError);
assertThat(((JsonRpcErrorResponse) expectedResponse).getError().getMessage())
.isEqualTo("Execution reverted");
assertThat(expectedResponse.getError().getMessage()).isEqualTo("Execution reverted");
mockTransactionProcessorSuccessResult(expectedResponse);
when(blockchainQueries.getBlockchain()).thenReturn(blockchain);
@@ -214,9 +213,9 @@ public class EthCallTest {
// bytes are invalid ABI)
final String abiHexString = "0x08c379a002d36d";
final JsonRpcError expectedError = new JsonRpcError(REVERT_ERROR, abiHexString);
final JsonRpcResponse expectedResponse = new JsonRpcErrorResponse(null, expectedError);
final JsonRpcErrorResponse expectedResponse = new JsonRpcErrorResponse(null, expectedError);
assertThat(((JsonRpcErrorResponse) expectedResponse).getError().getMessage())
assertThat(expectedResponse.getError().getMessage())
.isEqualTo("Execution reverted: ABI decode error");
mockTransactionProcessorSuccessResult(expectedResponse);
@@ -254,9 +253,9 @@ public class EthCallTest {
final String abiHexString =
"0x08c379a00000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000002545524332303a207472616e736665722066726f6d20746865207a65726f2061646472657373000000000000000000000000000000000000000000000000000000";
final JsonRpcError expectedError = new JsonRpcError(REVERT_ERROR, abiHexString);
final JsonRpcResponse expectedResponse = new JsonRpcErrorResponse(null, expectedError);
final JsonRpcErrorResponse expectedResponse = new JsonRpcErrorResponse(null, expectedError);
assertThat(((JsonRpcErrorResponse) expectedResponse).getError().getMessage())
assertThat(expectedResponse.getError().getMessage())
.isEqualTo("Execution reverted: ERC20: transfer from the zero address");
mockTransactionProcessorSuccessResult(expectedResponse);
@@ -279,8 +278,6 @@ public class EthCallTest {
when(result.result()).thenReturn(processingResult);
verify(transactionSimulator).process(any(), any(), any(), mapperCaptor.capture(), any());
System.out.println(result);
System.out.println(expectedResponse);
assertThat(mapperCaptor.getValue().apply(mock(MutableWorldState.class), Optional.of(result)))
.isEqualTo(Optional.of(expectedResponse));

View File

@@ -30,6 +30,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.RemovalReason;
import java.util.Arrays;
import java.util.HashMap;
@@ -47,6 +48,18 @@ public class PendingTransactionDroppedSubscriptionServiceTest {
private static final Hash TX_ONE =
Hash.fromHexString("0x15876958423545c3c7b0fcf9be8ffb543305ee1b43db87ed380dcf0cd16589f7");
private static final RemovalReason DUMMY_REMOVAL_REASON =
new RemovalReason() {
@Override
public String label() {
return "";
}
@Override
public boolean stopTracking() {
return false;
}
};
@Mock private SubscriptionManager subscriptionManager;
@Mock private Blockchain blockchain;
@@ -65,7 +78,7 @@ public class PendingTransactionDroppedSubscriptionServiceTest {
setUpSubscriptions(subscriptionIds);
final Transaction pending = transaction(TX_ONE);
service.onTransactionDropped(pending);
service.onTransactionDropped(pending, DUMMY_REMOVAL_REASON);
verifyNoInteractions(block);
verifyNoInteractions(blockchain);

View File

@@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
import org.hyperledger.besu.ethereum.eth.sync.SnapServerChecker;
@@ -26,6 +27,7 @@ import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
@@ -61,7 +63,7 @@ import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EthPeers {
public class EthPeers implements PeerSelector {
private static final Logger LOG = LoggerFactory.getLogger(EthPeers.class);
public static final Comparator<EthPeer> TOTAL_DIFFICULTY =
Comparator.comparing((final EthPeer p) -> p.chainState().getEstimatedTotalDifficulty());
@@ -465,6 +467,22 @@ public class EthPeers {
this.trailingPeerRequirementsSupplier = tprSupplier;
}
// Part of the PeerSelector interface, to be split apart later
@Override
public Optional<EthPeer> getPeer(final Predicate<EthPeer> filter) {
return streamAvailablePeers()
.filter(filter)
.filter(EthPeer::hasAvailableRequestCapacity)
.filter(EthPeer::isFullyValidated)
.min(LEAST_TO_MOST_BUSY);
}
// Part of the PeerSelector interface, to be split apart later
@Override
public Optional<EthPeer> getPeerByPeerId(final PeerId peerId) {
return Optional.ofNullable(activeConnections.get(peerId.getId()));
}
@FunctionalInterface
public interface ConnectCallback {
void onPeerConnected(EthPeer newPeer);

View File

@@ -0,0 +1,26 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
public class InvalidPeerTaskResponseException extends Exception {
public InvalidPeerTaskResponseException() {
super();
}
public InvalidPeerTaskResponseException(final Throwable cause) {
super(cause);
}
}

View File

@@ -0,0 +1,17 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
public class NoAvailablePeerException extends Exception {}

View File

@@ -0,0 +1,42 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import java.util.Optional;
import java.util.function.Predicate;
/** Selects the EthPeers for the PeerTaskExecutor */
public interface PeerSelector {
/**
* Gets a peer matching the supplied filter
*
* @param filter a Predicate\<EthPeer\> matching desirable peers
* @return a peer matching the supplied conditions
*/
Optional<EthPeer> getPeer(final Predicate<EthPeer> filter);
/**
* Attempts to get the EthPeer identified by peerId
*
* @param peerId the peerId of the desired EthPeer
* @return An Optional\<EthPeer\> containing the EthPeer identified by peerId if present in the
* PeerSelector, or empty otherwise
*/
Optional<EthPeer> getPeerByPeerId(PeerId peerId);
}

View File

@@ -0,0 +1,83 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.function.Predicate;
/**
* Represents a task to be executed on an EthPeer by the PeerTaskExecutor
*
* @param <T> The type of the result of this PeerTask
*/
public interface PeerTask<T> {
/**
* Returns the SubProtocol used for this PeerTask
*
* @return the SubProtocol used for this PeerTask
*/
SubProtocol getSubProtocol();
/**
* Gets the request data to send to the EthPeer
*
* @return the request data to send to the EthPeer
*/
MessageData getRequestMessage();
/**
* Parses the MessageData response from the EthPeer
*
* @param messageData the response MessageData to be parsed
* @return a T built from the response MessageData
* @throws InvalidPeerTaskResponseException if the response messageData is invalid
*/
T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException;
/**
* Gets the number of times this task may be attempted against other peers
*
* @return the number of times this task may be attempted against other peers
*/
default int getRetriesWithOtherPeer() {
return 5;
}
/**
* Gets the number of times this task may be attempted against the same peer
*
* @return the number of times this task may be attempted against the same peer
*/
default int getRetriesWithSamePeer() {
return 5;
}
/**
* Gets a Predicate that checks if an EthPeer is suitable for this PeerTask
*
* @return a Predicate that checks if an EthPeer is suitable for this PeerTask
*/
Predicate<EthPeer> getPeerRequirementFilter();
/**
* Checks if the supplied result is considered a success
*
* @return true if the supplied result is considered a success
*/
boolean isSuccess(T result);
}

View File

@@ -0,0 +1,196 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledGauge;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/** Manages the execution of PeerTasks, respecting their PeerTaskRetryBehavior */
public class PeerTaskExecutor {
private final PeerSelector peerSelector;
private final PeerTaskRequestSender requestSender;
private final LabelledMetric<OperationTimer> requestTimer;
private final LabelledMetric<Counter> timeoutCounter;
private final LabelledMetric<Counter> invalidResponseCounter;
private final LabelledMetric<Counter> internalExceptionCounter;
private final LabelledGauge inflightRequestGauge;
private final Map<String, AtomicInteger> inflightRequestCountByClassName;
public PeerTaskExecutor(
final PeerSelector peerSelector,
final PeerTaskRequestSender requestSender,
final MetricsSystem metricsSystem) {
this.peerSelector = peerSelector;
this.requestSender = requestSender;
requestTimer =
metricsSystem.createLabelledTimer(
BesuMetricCategory.PEERS,
"request_time",
"Time taken to send a request and receive a response",
"taskName");
timeoutCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.PEERS,
"timeout_total",
"Counter of the number of timeouts occurred",
"taskName");
invalidResponseCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.PEERS,
"invalid_response_total",
"Counter of the number of invalid responses received",
"taskName");
internalExceptionCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.PEERS,
"internal_exception_total",
"Counter of the number of internal exceptions occurred",
"taskName");
inflightRequestGauge =
metricsSystem.createLabelledGauge(
BesuMetricCategory.PEERS,
"inflight_request_gauge",
"Gauge of the number of inflight requests",
"taskName");
inflightRequestCountByClassName = new ConcurrentHashMap<>();
}
public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) {
PeerTaskExecutorResult<T> executorResult;
int retriesRemaining = peerTask.getRetriesWithOtherPeer();
final Collection<EthPeer> usedEthPeers = new HashSet<>();
do {
Optional<EthPeer> peer =
peerSelector.getPeer(
(candidatePeer) ->
peerTask.getPeerRequirementFilter().test(candidatePeer)
&& !usedEthPeers.contains(candidatePeer));
if (peer.isEmpty()) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE);
continue;
}
usedEthPeers.add(peer.get());
executorResult = executeAgainstPeer(peerTask, peer.get());
} while (retriesRemaining-- > 0
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS);
return executorResult;
}
public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
final PeerTask<T> peerTask, final EthPeer peer) {
String taskClassName = peerTask.getClass().getSimpleName();
AtomicInteger inflightRequestCountForThisTaskClass =
inflightRequestCountByClassName.computeIfAbsent(
taskClassName,
(k) -> {
AtomicInteger inflightRequests = new AtomicInteger(0);
inflightRequestGauge.labels(inflightRequests::get, taskClassName);
return inflightRequests;
});
MessageData requestMessageData = peerTask.getRequestMessage();
PeerTaskExecutorResult<T> executorResult;
int retriesRemaining = peerTask.getRetriesWithSamePeer();
do {
try {
T result;
try (final OperationTimer.TimingContext ignored =
requestTimer.labels(taskClassName).startTimer()) {
inflightRequestCountForThisTaskClass.incrementAndGet();
MessageData responseMessageData =
requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer);
result = peerTask.parseResponse(responseMessageData);
} finally {
inflightRequestCountForThisTaskClass.decrementAndGet();
}
if (peerTask.isSuccess(result)) {
peer.recordUsefulResponse();
executorResult =
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS);
} else {
// At this point, the result is most likely empty. Technically, this is a valid result, so
// we don't penalise the peer, but it's also a useless result, so we return
// INVALID_RESPONSE code
executorResult =
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
}
} catch (PeerNotConnected e) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED);
} catch (InterruptedException | TimeoutException e) {
peer.recordRequestTimeout(requestMessageData.getCode());
timeoutCounter.labels(taskClassName).inc();
executorResult =
new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT);
} catch (InvalidPeerTaskResponseException e) {
peer.recordUselessResponse(e.getMessage());
invalidResponseCounter.labels(taskClassName).inc();
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
} catch (ExecutionException e) {
internalExceptionCounter.labels(taskClassName).inc();
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR);
}
} while (retriesRemaining-- > 0
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED
&& sleepBetweenRetries());
return executorResult;
}
private boolean sleepBetweenRetries() {
try {
// sleep for 1 second to match implemented wait between retries in AbstractRetryingPeerTask
Thread.sleep(1000);
return true;
} catch (InterruptedException e) {
return false;
}
}
}

View File

@@ -0,0 +1,24 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
public enum PeerTaskExecutorResponseCode {
SUCCESS,
NO_PEER_AVAILABLE,
PEER_DISCONNECTED,
INTERNAL_SERVER_ERROR,
TIMEOUT,
INVALID_RESPONSE
}

View File

@@ -0,0 +1,20 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import java.util.Optional;
public record PeerTaskExecutorResult<T>(
Optional<T> result, PeerTaskExecutorResponseCode responseCode) {}

View File

@@ -0,0 +1,56 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager.ResponseStream;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PeerTaskRequestSender {
private static final long DEFAULT_TIMEOUT_MS = 5_000;
private final long timeoutMs;
public PeerTaskRequestSender() {
this.timeoutMs = DEFAULT_TIMEOUT_MS;
}
public PeerTaskRequestSender(final long timeoutMs) {
this.timeoutMs = timeoutMs;
}
public MessageData sendRequest(
final SubProtocol subProtocol, final MessageData requestMessageData, final EthPeer ethPeer)
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException {
ResponseStream responseStream =
ethPeer.send(requestMessageData, subProtocol.getName(), ethPeer.getConnection());
final CompletableFuture<MessageData> responseMessageDataFuture = new CompletableFuture<>();
responseStream.then(
(boolean streamClosed, MessageData message, EthPeer peer) -> {
responseMessageDataFuture.complete(message);
});
return responseMessageDataFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
}
}

View File

@@ -54,13 +54,6 @@ abstract class AbstractPeerBlockValidator implements PeerValidator {
this.chainHeightEstimationBuffer = chainHeightEstimationBuffer;
}
protected AbstractPeerBlockValidator(
final ProtocolSchedule protocolSchedule,
final MetricsSystem metricsSystem,
final long blockNumber) {
this(protocolSchedule, metricsSystem, blockNumber, DEFAULT_CHAIN_HEIGHT_ESTIMATION_BUFFER);
}
@Override
public CompletableFuture<Boolean> validatePeer(
final EthContext ethContext, final EthPeer ethPeer) {

View File

@@ -34,7 +34,8 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
public class PeerTransactionTracker
implements EthPeer.DisconnectCallback, PendingTransactionDroppedListener {
private static final Logger LOG = LoggerFactory.getLogger(PeerTransactionTracker.class);
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000;
@@ -122,13 +123,14 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
}
private <T> Set<T> createTransactionsSet() {
return Collections.newSetFromMap(
new LinkedHashMap<>(1 << 4, 0.75f, true) {
@Override
protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) {
return size() > MAX_TRACKED_SEEN_TRANSACTIONS;
}
});
return Collections.synchronizedSet(
Collections.newSetFromMap(
new LinkedHashMap<>(16, 0.75f, true) {
@Override
protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) {
return size() > MAX_TRACKED_SEEN_TRANSACTIONS;
}
}));
}
@Override
@@ -175,4 +177,11 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
private String logPeerSet(final Set<EthPeer> peers) {
return peers.stream().map(EthPeer::getLoggableId).collect(Collectors.joining(","));
}
@Override
public void onTransactionDropped(final Transaction transaction, final RemovalReason reason) {
if (reason.stopTracking()) {
seenTransactions.values().stream().forEach(st -> st.remove(transaction.getHash()));
}
}
}

View File

@@ -19,5 +19,5 @@ import org.hyperledger.besu.ethereum.core.Transaction;
@FunctionalInterface
public interface PendingTransactionDroppedListener {
void onTransactionDropped(Transaction transaction);
void onTransactionDropped(Transaction transaction, final RemovalReason reason);
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
/** The reason why a pending tx has been removed */
public interface RemovalReason {
/**
* Return a label that identify this reason to be used in the metric system.
*
* @return a label
*/
String label();
/**
* Return true if we should stop tracking the tx as already seen
*
* @return true if no more tracking is needed
*/
boolean stopTracking();
}

View File

@@ -137,7 +137,8 @@ public class TransactionPool implements BlockAddedObserver {
initializeBlobMetrics();
initLogForReplay();
subscribePendingTransactions(this::mapBlobsOnTransactionAdded);
subscribeDroppedTransactions(this::unmapBlobsOnTransactionDropped);
subscribeDroppedTransactions(
(transaction, reason) -> unmapBlobsOnTransactionDropped(transaction));
}
private void initLogForReplay() {
@@ -720,7 +721,9 @@ public class TransactionPool implements BlockAddedObserver {
void subscribe() {
onAddedListenerId = pendingTransactions.subscribePendingTransactions(this::onAdded);
onDroppedListenerId = pendingTransactions.subscribeDroppedTransactions(this::onDropped);
onDroppedListenerId =
pendingTransactions.subscribeDroppedTransactions(
(transaction, reason) -> onDropped(transaction, reason));
}
void unsubscribe() {
@@ -728,8 +731,8 @@ public class TransactionPool implements BlockAddedObserver {
pendingTransactions.unsubscribeDroppedTransactions(onDroppedListenerId);
}
private void onDropped(final Transaction transaction) {
onDroppedListeners.forEach(listener -> listener.onTransactionDropped(transaction));
private void onDropped(final Transaction transaction, final RemovalReason reason) {
onDroppedListeners.forEach(listener -> listener.onTransactionDropped(transaction, reason));
}
private void onAdded(final Transaction transaction) {

View File

@@ -137,7 +137,7 @@ public abstract class AbstractPrioritizedTransactions extends AbstractSequential
protected void internalRemove(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction removedTx,
final RemovalReason removalReason) {
final LayeredRemovalReason removalReason) {
orderByFee.remove(removedTx);
}

View File

@@ -15,8 +15,8 @@
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.FOLLOW_INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.FOLLOW_INVALIDATED;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
@@ -24,7 +24,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason;
import java.util.Map;
import java.util.NavigableMap;

View File

@@ -20,12 +20,12 @@ import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedRes
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.TRY_NEXT_LAYER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.BELOW_MIN_SCORE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.RemovedFrom.POOL;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.BELOW_MIN_SCORE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.RemovedFrom.POOL;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
@@ -295,7 +295,9 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
if (remainingPromotionsPerType[txType.ordinal()] > 0) {
senderTxs.pollFirstEntry();
processRemove(
senderTxs, candidateTx.getTransaction(), RemovalReason.LayerMoveReason.PROMOTED);
senderTxs,
candidateTx.getTransaction(),
LayeredRemovalReason.LayerMoveReason.PROMOTED);
metrics.incrementRemoved(candidateTx, "promoted", name());
if (senderTxs.isEmpty()) {
@@ -386,7 +388,7 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
decreaseCounters(replacedTx);
metrics.incrementRemoved(replacedTx, REPLACED.label(), name());
internalReplaced(replacedTx);
notifyTransactionDropped(replacedTx);
notifyTransactionDropped(replacedTx, REPLACED);
}
protected abstract void internalReplaced(final PendingTransaction replacedTx);
@@ -415,7 +417,7 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
protected PendingTransaction processRemove(
final NavigableMap<Long, PendingTransaction> senderTxs,
final Transaction transaction,
final RemovalReason removalReason) {
final LayeredRemovalReason removalReason) {
final PendingTransaction removedTx = pendingTransactions.remove(transaction.getHash());
if (removedTx != null) {
@@ -423,7 +425,7 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
metrics.incrementRemoved(removedTx, removalReason.label(), name());
internalRemove(senderTxs, removedTx, removalReason);
if (removalReason.removedFrom().equals(POOL)) {
notifyTransactionDropped(removedTx);
notifyTransactionDropped(removedTx, removalReason);
}
}
return removedTx;
@@ -432,7 +434,7 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
protected PendingTransaction processEvict(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction evictedTx,
final RemovalReason reason) {
final LayeredRemovalReason reason) {
final PendingTransaction removedTx = pendingTransactions.remove(evictedTx.getHash());
if (removedTx != null) {
decreaseCounters(evictedTx);
@@ -545,7 +547,7 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
protected abstract void internalRemove(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction pendingTransaction,
final RemovalReason removalReason);
final LayeredRemovalReason removalReason);
protected abstract PendingTransaction getEvictable();
@@ -606,9 +608,10 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
listener -> listener.onTransactionAdded(pendingTransaction.getTransaction()));
}
protected void notifyTransactionDropped(final PendingTransaction pendingTransaction) {
protected void notifyTransactionDropped(
final PendingTransaction pendingTransaction, final LayeredRemovalReason reason) {
onDroppedListeners.forEach(
listener -> listener.onTransactionDropped(pendingTransaction.getTransaction()));
listener -> listener.onTransactionDropped(pendingTransaction.getTransaction(), reason));
}
@Override

View File

@@ -15,7 +15,7 @@
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.DEMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.DEMOTED;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.core.BlockHeader;

View File

@@ -14,7 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.DROPPED;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
@@ -25,7 +25,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedLis
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.util.Subscribers;
@@ -78,7 +78,7 @@ public class EndLayer implements TransactionsLayer {
@Override
public TransactionAddedResult add(
final PendingTransaction pendingTransaction, final int gap, final AddReason reason) {
notifyTransactionDropped(pendingTransaction);
notifyTransactionDropped(pendingTransaction, DROPPED);
metrics.incrementRemoved(pendingTransaction, DROPPED.label(), name());
++droppedCount;
return TransactionAddedResult.DROPPED;
@@ -152,9 +152,10 @@ public class EndLayer implements TransactionsLayer {
onDroppedListeners.unsubscribe(id);
}
protected void notifyTransactionDropped(final PendingTransaction pendingTransaction) {
protected void notifyTransactionDropped(
final PendingTransaction pendingTransaction, final LayeredRemovalReason reason) {
onDroppedListeners.forEach(
listener -> listener.onTransactionDropped(pendingTransaction.getTransaction()));
listener -> listener.onTransactionDropped(pendingTransaction.getTransaction(), reason));
}
@Override

View File

@@ -21,8 +21,8 @@ import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedRes
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.INTERNAL_ERROR;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.NEW;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.RECONCILED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.RECONCILED;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;

View File

@@ -14,10 +14,12 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import org.hyperledger.besu.ethereum.eth.transactions.RemovalReason;
import java.util.Locale;
/** The reason why a pending tx has been removed */
public interface RemovalReason {
interface LayeredRemovalReason extends RemovalReason {
/**
* From where the tx has been removed
*
@@ -25,13 +27,6 @@ public interface RemovalReason {
*/
RemovedFrom removedFrom();
/**
* Return a label that identify this reason to be used in the metric system.
*
* @return a label
*/
String label();
/** There are 2 kinds of removals, from a layer and from the pool. */
enum RemovedFrom {
/**
@@ -50,37 +45,53 @@ public interface RemovalReason {
}
/** The reason why the tx has been removed from the pool */
enum PoolRemovalReason implements RemovalReason {
/** Tx removed since it is confirmed on chain, as part of an imported block. */
CONFIRMED,
/** Tx removed since it has been replaced by another one added in the same layer. */
REPLACED,
/** Tx removed since it has been replaced by another one added in another layer. */
CROSS_LAYER_REPLACED,
/** Tx removed when the pool is full, to make space for new incoming txs. */
DROPPED,
enum PoolRemovalReason implements LayeredRemovalReason {
/**
* Tx removed since it is confirmed on chain, as part of an imported block. Keep tracking since
* makes no sense to reprocess a confirmed.
*/
CONFIRMED(false),
/**
* Tx removed since it has been replaced by another one added in the same layer. Keep tracking
* since makes no sense to reprocess a replaced tx.
*/
REPLACED(false),
/**
* Tx removed since it has been replaced by another one added in another layer. Keep tracking
* since makes no sense to reprocess a replaced tx.
*/
CROSS_LAYER_REPLACED(false),
/**
* Tx removed when the pool is full, to make space for new incoming txs. Stop tracking it so we
* could re-accept it in the future.
*/
DROPPED(true),
/**
* Tx removed since found invalid after it was added to the pool, for example during txs
* selection for a new block proposal.
* selection for a new block proposal. Keep tracking since we do not want to reprocess an
* invalid tx.
*/
INVALIDATED,
INVALIDATED(false),
/**
* Special case, when for a sender, discrepancies are found between the world state view and the
* pool view, then all the txs for this sender are removed and added again. Discrepancies, are
* rare, and can happen during a short windows when a new block is being imported and the world
* state being updated.
* state being updated. Keep tracking since it is removed and re-added.
*/
RECONCILED,
RECONCILED(false),
/**
* When a pending tx is penalized its score is decreased, if at some point its score is lower
* than the configured minimum then the pending tx is removed from the pool.
* than the configured minimum then the pending tx is removed from the pool. Stop tracking it so
* we could re-accept it in the future.
*/
BELOW_MIN_SCORE;
BELOW_MIN_SCORE(true);
private final String label;
private final boolean stopTracking;
PoolRemovalReason() {
PoolRemovalReason(final boolean stopTracking) {
this.label = name().toLowerCase(Locale.ROOT);
this.stopTracking = stopTracking;
}
@Override
@@ -92,10 +103,15 @@ public interface RemovalReason {
public String label() {
return label;
}
@Override
public boolean stopTracking() {
return stopTracking;
}
}
/** The reason why the tx has been moved across layers */
enum LayerMoveReason implements RemovalReason {
enum LayerMoveReason implements LayeredRemovalReason {
/**
* When the current layer is full, and this tx needs to be moved to the lower layer, in order to
* free space.
@@ -132,5 +148,15 @@ public interface RemovalReason {
public String label() {
return label;
}
/**
* We need to continue to track a tx when is moved between layers
*
* @return always false
*/
@Override
public boolean stopTracking() {
return false;
}
}
}

View File

@@ -14,7 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.PROMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.PROMOTED;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@@ -109,7 +109,7 @@ public class ReadyTransactions extends AbstractSequentialTransactionsLayer {
protected void internalRemove(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction removedTx,
final RemovalReason removalReason) {
final LayeredRemovalReason removalReason) {
orderByMaxFee.remove(removedTx);
if (!senderTxs.isEmpty()) {
orderByMaxFee.add(senderTxs.firstEntry().getValue());

View File

@@ -14,8 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.PROMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.PROMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.INVALIDATED;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@@ -26,7 +26,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import java.util.ArrayList;
@@ -278,7 +278,7 @@ public class SparseTransactions extends AbstractTransactionsLayer {
protected void internalRemove(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction removedTx,
final RemovalReason removalReason) {
final LayeredRemovalReason removalReason) {
sparseEvictionOrder.remove(removedTx);

View File

@@ -22,7 +22,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import java.util.List;

View File

@@ -18,6 +18,10 @@ import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedRes
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.sorter.SequencedRemovalReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.sorter.SequencedRemovalReason.INVALID;
import static org.hyperledger.besu.ethereum.eth.transactions.sorter.SequencedRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.sorter.SequencedRemovalReason.TIMED_EVICTION;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
@@ -160,7 +164,7 @@ public abstract class AbstractPendingTransactionsSorter implements PendingTransa
.setMessage("Evicted {} due to age")
.addArgument(transactionInfo::toTraceLog)
.log();
removeTransaction(transactionInfo.getTransaction());
removeTransaction(transactionInfo.getTransaction(), TIMED_EVICTION);
});
}
@@ -196,9 +200,9 @@ public abstract class AbstractPendingTransactionsSorter implements PendingTransa
return transactionAddedStatus;
}
void removeTransaction(final Transaction transaction) {
void removeTransaction(final Transaction transaction, final SequencedRemovalReason reason) {
removeTransaction(transaction, false);
notifyTransactionDropped(transaction);
notifyTransactionDropped(transaction, reason);
}
@Override
@@ -256,12 +260,12 @@ public abstract class AbstractPendingTransactionsSorter implements PendingTransa
}
if (result.stop()) {
transactionsToRemove.forEach(this::removeTransaction);
transactionsToRemove.forEach(tx -> removeTransaction(tx, INVALID));
return;
}
}
}
transactionsToRemove.forEach(this::removeTransaction);
transactionsToRemove.forEach(tx -> removeTransaction(tx, INVALID));
}
}
@@ -324,7 +328,7 @@ public abstract class AbstractPendingTransactionsSorter implements PendingTransa
.setMessage("Tracked transaction by sender {}")
.addArgument(pendingTxsForSender::toTraceLog)
.log();
maybeReplacedTransaction.ifPresent(this::removeTransaction);
maybeReplacedTransaction.ifPresent(tx -> removeTransaction(tx, REPLACED));
return ADDED;
}
@@ -354,8 +358,10 @@ public abstract class AbstractPendingTransactionsSorter implements PendingTransa
pendingTransactionSubscribers.forEach(listener -> listener.onTransactionAdded(transaction));
}
private void notifyTransactionDropped(final Transaction transaction) {
transactionDroppedListeners.forEach(listener -> listener.onTransactionDropped(transaction));
private void notifyTransactionDropped(
final Transaction transaction, final SequencedRemovalReason reason) {
transactionDroppedListeners.forEach(
listener -> listener.onTransactionDropped(transaction, reason));
}
@Override
@@ -491,7 +497,7 @@ public abstract class AbstractPendingTransactionsSorter implements PendingTransa
// remove backward to avoid gaps
for (int i = txsToEvict.size() - 1; i >= 0; i--) {
removeTransaction(txsToEvict.get(i).getTransaction());
removeTransaction(txsToEvict.get(i).getTransaction(), EVICTED);
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions.sorter;
import org.hyperledger.besu.ethereum.eth.transactions.RemovalReason;
import java.util.Locale;
/** The reason why a pending tx has been removed */
enum SequencedRemovalReason implements RemovalReason {
EVICTED(true),
TIMED_EVICTION(true),
REPLACED(false),
INVALID(false);
private final String label;
private final boolean stopTracking;
SequencedRemovalReason(final boolean stopTracking) {
this.label = name().toLowerCase(Locale.ROOT);
this.stopTracking = stopTracking;
}
@Override
public String label() {
return label;
}
@Override
public boolean stopTracking() {
return stopTracking;
}
}

View File

@@ -0,0 +1,290 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
public class PeerTaskExecutorTest {
private @Mock PeerSelector peerSelector;
private @Mock PeerTaskRequestSender requestSender;
private @Mock PeerTask<Object> peerTask;
private @Mock SubProtocol subprotocol;
private @Mock MessageData requestMessageData;
private @Mock MessageData responseMessageData;
private @Mock EthPeer ethPeer;
private AutoCloseable mockCloser;
private PeerTaskExecutor peerTaskExecutor;
@BeforeEach
public void beforeTest() {
mockCloser = MockitoAnnotations.openMocks(this);
peerTaskExecutor = new PeerTaskExecutor(peerSelector, requestSender, new NoOpMetricsSystem());
}
@AfterEach
public void afterTest() throws Exception {
mockCloser.close();
}
@Test
public void testExecuteAgainstPeerWithNoRetriesAndSuccessfulFlow()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Object responseObject = new Object();
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordUsefulResponse();
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isPresent());
Assertions.assertSame(responseObject, result.result().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.responseCode());
}
@Test
public void testExecuteAgainstPeerWithNoRetriesAndPartialSuccessfulFlow()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Object responseObject = new Object();
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(false);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isPresent());
Assertions.assertEquals(PeerTaskExecutorResponseCode.INVALID_RESPONSE, result.responseCode());
}
@Test
public void testExecuteAgainstPeerWithRetriesAndSuccessfulFlowAfterFirstFailure()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Object responseObject = new Object();
int requestMessageDataCode = 123;
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(2);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException())
.thenReturn(responseMessageData);
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode);
Mockito.verify(ethPeer).recordUsefulResponse();
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isPresent());
Assertions.assertSame(responseObject, result.result().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.responseCode());
}
@Test
public void testExecuteAgainstPeerWithNoRetriesAndPeerNotConnected()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException {
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new PeerConnection.PeerNotConnected(""));
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isEmpty());
Assertions.assertEquals(PeerTaskExecutorResponseCode.PEER_DISCONNECTED, result.responseCode());
}
@Test
public void testExecuteAgainstPeerWithNoRetriesAndTimeoutException()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException {
int requestMessageDataCode = 123;
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException());
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isEmpty());
Assertions.assertEquals(PeerTaskExecutorResponseCode.TIMEOUT, result.responseCode());
}
@Test
public void testExecuteAgainstPeerWithNoRetriesAndInvalidResponseMessage()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData))
.thenThrow(new InvalidPeerTaskResponseException());
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordUselessResponse(null);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isEmpty());
Assertions.assertEquals(PeerTaskExecutorResponseCode.INVALID_RESPONSE, result.responseCode());
}
@Test
@SuppressWarnings("unchecked")
public void testExecuteWithNoRetriesAndSuccessFlow()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Object responseObject = new Object();
Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class)))
.thenReturn(Optional.of(ethPeer));
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithOtherPeer()).thenReturn(0);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordUsefulResponse();
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isPresent());
Assertions.assertSame(responseObject, result.result().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.responseCode());
}
@Test
@SuppressWarnings("unchecked")
public void testExecuteWithPeerSwitchingAndSuccessFlow()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Object responseObject = new Object();
int requestMessageDataCode = 123;
EthPeer peer2 = Mockito.mock(EthPeer.class);
Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class)))
.thenReturn(Optional.of(ethPeer))
.thenReturn(Optional.of(peer2));
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithOtherPeer()).thenReturn(2);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException());
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, peer2))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.execute(peerTask);
Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode);
Mockito.verify(peer2).recordUsefulResponse();
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isPresent());
Assertions.assertSame(responseObject, result.result().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.responseCode());
}
}

View File

@@ -0,0 +1,75 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
public class PeerTaskRequestSenderTest {
private PeerTaskRequestSender peerTaskRequestSender;
@BeforeEach
public void beforeTest() {
peerTaskRequestSender = new PeerTaskRequestSender();
}
@Test
public void testSendRequest()
throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException {
SubProtocol subprotocol = Mockito.mock(SubProtocol.class);
MessageData requestMessageData = Mockito.mock(MessageData.class);
MessageData responseMessageData = Mockito.mock(MessageData.class);
EthPeer peer = Mockito.mock(EthPeer.class);
PeerConnection peerConnection = Mockito.mock(PeerConnection.class);
RequestManager.ResponseStream responseStream =
Mockito.mock(RequestManager.ResponseStream.class);
Mockito.when(peer.getConnection()).thenReturn(peerConnection);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(peer.send(requestMessageData, "subprotocol", peerConnection))
.thenReturn(responseStream);
CompletableFuture<MessageData> actualResponseMessageDataFuture =
CompletableFuture.supplyAsync(
() -> {
try {
return peerTaskRequestSender.sendRequest(subprotocol, requestMessageData, peer);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Thread.sleep(500);
ArgumentCaptor<RequestManager.ResponseCallback> responseCallbackArgumentCaptor =
ArgumentCaptor.forClass(RequestManager.ResponseCallback.class);
Mockito.verify(responseStream).then(responseCallbackArgumentCaptor.capture());
RequestManager.ResponseCallback responseCallback = responseCallbackArgumentCaptor.getValue();
responseCallback.exec(false, responseMessageData, peer);
Assertions.assertSame(responseMessageData, actualResponseMessageDataFuture.get());
}
}

View File

@@ -65,6 +65,28 @@ public class PeerTransactionTrackerTest {
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(transaction3);
}
@Test
public void shouldStopTrackingSeenTransactionsWhenRemovalReasonSaysSo() {
tracker.markTransactionsAsSeen(ethPeer1, ImmutableSet.of(transaction2));
assertThat(tracker.hasSeenTransaction(transaction2.getHash())).isTrue();
tracker.onTransactionDropped(transaction2, createRemovalReason(true));
assertThat(tracker.hasSeenTransaction(transaction2.getHash())).isFalse();
}
@Test
public void shouldKeepTrackingSeenTransactionsWhenRemovalReasonSaysSo() {
tracker.markTransactionsAsSeen(ethPeer1, ImmutableSet.of(transaction2));
assertThat(tracker.hasSeenTransaction(transaction2.getHash())).isTrue();
tracker.onTransactionDropped(transaction2, createRemovalReason(false));
assertThat(tracker.hasSeenTransaction(transaction2.getHash())).isTrue();
}
@Test
public void shouldExcludeAlreadySeenTransactionsAsACollectionFromTransactionsToSend() {
tracker.markTransactionsAsSeen(ethPeer1, ImmutableSet.of(transaction1, transaction2));
@@ -125,4 +147,19 @@ public class PeerTransactionTrackerTest {
assertThat(tracker.hasPeerSeenTransaction(ethPeer1, transaction1)).isFalse();
assertThat(tracker.hasPeerSeenTransaction(ethPeer2, transaction2)).isFalse();
}
private RemovalReason createRemovalReason(final boolean stopTracking) {
return new RemovalReason() {
@Override
public String label() {
return "";
}
@Override
public boolean stopTracking() {
return stopTracking;
}
};
}
}

View File

@@ -22,8 +22,8 @@ import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedRes
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.NEW;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOB_PRICE_BELOW_CURRENT_MIN;
@@ -281,7 +281,7 @@ public class LayeredPendingTransactionsTest extends BaseTransactionPoolTest {
assertThat(smallLayers.evictedCollector.getEvictedTransactions())
.map(PendingTransaction::getTransaction)
.contains(firstTxs.get(0));
verify(droppedListener).onTransactionDropped(firstTxs.get(0));
verify(droppedListener).onTransactionDropped(firstTxs.get(0), DROPPED);
}
@Test

View File

@@ -21,13 +21,13 @@ import static org.hyperledger.besu.datatypes.TransactionType.ACCESS_LIST;
import static org.hyperledger.besu.datatypes.TransactionType.BLOB;
import static org.hyperledger.besu.datatypes.TransactionType.EIP1559;
import static org.hyperledger.besu.datatypes.TransactionType.FRONTIER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.S1;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.S2;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.S3;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.S4;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.SP1;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.SP2;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -1409,7 +1409,8 @@ public class LayersTest extends BaseTransactionPoolTest {
this.pending = new LayeredPendingTransactions(poolConfig, this.prio, ethScheduler);
this.pending.subscribePendingTransactions(notificationsChecker::collectAddNotification);
this.pending.subscribeDroppedTransactions(notificationsChecker::collectDropNotification);
this.pending.subscribeDroppedTransactions(
(tx, reason) -> notificationsChecker.collectDropNotification(tx));
}
@Override

View File

@@ -16,7 +16,7 @@ package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.INVALIDATED;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

View File

@@ -18,6 +18,10 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.sorter.SequencedRemovalReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.sorter.SequencedRemovalReason.INVALID;
import static org.hyperledger.besu.ethereum.eth.transactions.sorter.SequencedRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.sorter.SequencedRemovalReason.TIMED_EVICTION;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.SELECTED;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -292,9 +296,9 @@ public abstract class AbstractPendingTransactionsTestBase {
transactions.subscribeDroppedTransactions(droppedListener);
transactions.removeTransaction(transaction1);
transactions.removeTransaction(transaction1, TIMED_EVICTION);
verify(droppedListener).onTransactionDropped(transaction1);
verify(droppedListener).onTransactionDropped(transaction1, TIMED_EVICTION);
}
@Test
@@ -304,13 +308,13 @@ public abstract class AbstractPendingTransactionsTestBase {
final long id = transactions.subscribeDroppedTransactions(droppedListener);
transactions.removeTransaction(transaction1);
transactions.removeTransaction(transaction1, EVICTED);
verify(droppedListener).onTransactionDropped(transaction1);
verify(droppedListener).onTransactionDropped(transaction1, EVICTED);
transactions.unsubscribeDroppedTransactions(id);
transactions.removeTransaction(transaction2);
transactions.removeTransaction(transaction2, EVICTED);
verifyNoMoreInteractions(droppedListener);
}
@@ -321,9 +325,9 @@ public abstract class AbstractPendingTransactionsTestBase {
transactions.subscribeDroppedTransactions(droppedListener);
transactions.removeTransaction(transaction1);
transactions.removeTransaction(transaction1, REPLACED);
verify(droppedListener).onTransactionDropped(transaction1);
verify(droppedListener).onTransactionDropped(transaction1, REPLACED);
}
@Test
@@ -473,7 +477,7 @@ public abstract class AbstractPendingTransactionsTestBase {
public void shouldReturnEmptyOptionalAsMaximumNonceWhenLastTransactionForSenderRemoved() {
final Transaction transaction = transactionWithNonceAndSender(1, KEYS1);
transactions.addTransaction(createRemotePendingTransaction(transaction), Optional.empty());
transactions.removeTransaction(transaction);
transactions.removeTransaction(transaction, INVALID);
assertThat(transactions.getNextNonceForSender(SENDER1)).isEmpty();
}
@@ -822,6 +826,8 @@ public abstract class AbstractPendingTransactionsTestBase {
.build(),
Optional.of(clock));
twoHourEvictionTransactionPool.subscribeDroppedTransactions(droppedListener);
twoHourEvictionTransactionPool.addTransaction(
createRemotePendingTransaction(transaction1, clock.millis()), Optional.empty());
assertThat(twoHourEvictionTransactionPool.size()).isEqualTo(1);
@@ -832,6 +838,7 @@ public abstract class AbstractPendingTransactionsTestBase {
twoHourEvictionTransactionPool.evictOldTransactions();
assertThat(twoHourEvictionTransactionPool.size()).isEqualTo(1);
assertThat(metricsSystem.getCounterValue(REMOVED_COUNTER, REMOTE, DROPPED)).isEqualTo(1);
verify(droppedListener).onTransactionDropped(transaction1, TIMED_EVICTION);
}
@Test
@@ -949,7 +956,8 @@ public abstract class AbstractPendingTransactionsTestBase {
public void shouldPrioritizeGasPriceThenTimeAddedToPool() {
// Make sure the 100 gas price TX isn't dropped
transactions.subscribeDroppedTransactions(
transaction -> assertThat(transaction.getGasPrice().get().toLong()).isLessThan(100));
(transaction, reason) ->
assertThat(transaction.getGasPrice().get().toLong()).isLessThan(100));
// Fill the pool with transactions from random senders
final List<Transaction> lowGasPriceTransactions =

View File

@@ -253,5 +253,14 @@ public class NoOpMetricsSystem implements ObservableMetricsSystem {
"The count of labels used must match the count of labels expected.");
Preconditions.checkNotNull(valueSupplier, "No valueSupplier specified");
}
@Override
public boolean isLabelsObserved(final String... labelValues) {
Preconditions.checkArgument(
labelValues.length == labelCount,
"The count of labels used must match the count of labels expected.");
final String labelValuesString = String.join(",", labelValues);
return labelValuesCache.contains(labelValuesString);
}
}
}

View File

@@ -36,4 +36,10 @@ public class NoOpValueCollector implements LabelledGauge {
}
labelValuesCreated.add(labelValuesString);
}
@Override
public boolean isLabelsObserved(final String... labelValues) {
final String labelValuesString = String.join(",", labelValues);
return labelValuesCreated.contains(labelValuesString);
}
}

View File

@@ -63,6 +63,14 @@ public class OpenTelemetryGauge implements LabelledGauge {
}
}
@Override
public boolean isLabelsObserved(final String... labelValues) {
Preconditions.checkArgument(
labelValues.length == labelNames.size(),
"label values and label names need the same number of elements");
return observationsMap.containsKey(getLabels(labelValues));
}
private Attributes getLabels(final String... labelValues) {
final AttributesBuilder labelsBuilder = Attributes.builder();
for (int i = 0; i < labelNames.size(); i++) {

View File

@@ -47,10 +47,7 @@ public class PrometheusGauge extends Collector implements LabelledGauge {
@Override
public synchronized void labels(final DoubleSupplier valueSupplier, final String... labelValues) {
if (labelValues.length != labelNames.size()) {
throw new IllegalArgumentException(
"Label values and label names must be the same cardinality");
}
validateLabelsCardinality(labelValues);
if (observationsMap.putIfAbsent(List.of(labelValues), valueSupplier) != null) {
final String labelValuesString = String.join(",", labelValues);
throw new IllegalArgumentException(
@@ -58,6 +55,12 @@ public class PrometheusGauge extends Collector implements LabelledGauge {
}
}
@Override
public boolean isLabelsObserved(final String... labelValues) {
validateLabelsCardinality(labelValues);
return observationsMap.containsKey(List.of(labelValues));
}
@Override
public List<MetricFamilySamples> collect() {
final List<MetricFamilySamples.Sample> samples = new ArrayList<>();
@@ -68,4 +71,11 @@ public class PrometheusGauge extends Collector implements LabelledGauge {
metricName, labelNames, labels, valueSupplier.getAsDouble())));
return List.of(new MetricFamilySamples(metricName, Type.GAUGE, help, samples));
}
private void validateLabelsCardinality(final String... labelValues) {
if (labelValues.length != labelNames.size()) {
throw new IllegalArgumentException(
"Label values and label names must be the same cardinality");
}
}
}

View File

@@ -71,7 +71,7 @@ Calculated : ${currentHash}
tasks.register('checkAPIChanges', FileStateChecker) {
description = "Checks that the API for the Plugin-API project does not change without deliberate thought"
files = sourceSets.main.allJava.files
knownHash = '4jVaj9yW88nHbX0KmTR3dPQRvj9x8Pvh5E9Ry7KRT6w='
knownHash = 'WRdnBaP05fItpWHYSFz/vBBlRWL3sLGqzR3tzd+pOkA='
}
check.dependsOn('checkAPIChanges')

View File

@@ -25,4 +25,15 @@ public interface LabelledGauge {
* @param labelValues the label values
*/
void labels(final DoubleSupplier valueSupplier, final String... labelValues);
/**
* Checks whether the supplied labelValues are already observed by this LabelledGauge
*
* @param labelValues The labelValues to check
* @return true if the supplied labelValues are already observed by this LabelledGauge, false
* otherwise
*/
default boolean isLabelsObserved(final String... labelValues) {
return false;
}
}