[BESU-176] JRPC response to eth_getTransactionCount is different from other node implementations. (#351)

* Fix strategy for getting next nonce for sender in `PendingTransactions`.
Added test.

Signed-off-by: Abdelhamid Bakhta <abdelhamid.bakhta@consensys.net>

* Introduced `TransactionsForSenderInfo` structure to have a less naive implementation, more robust against load testing.
A queue of gaps between used nonces is maintained.
`getNextNonceForSender` has been updated and have the following logic:
- if no infos corresponding to the nonce returns an empty optional
- if gap list is not empty returns the lowest nonce in the gap
- else returns the max nonce + 1

Signed-off-by: Abdelhamid Bakhta <abdelhamid.bakhta@consensys.net>

* spotless

Signed-off-by: Abdelhamid Bakhta <abdelhamid.bakhta@consensys.net>

* spotless

Signed-off-by: Abdelhamid Bakhta <abdelhamid.bakhta@consensys.net>

* added test

Signed-off-by: Abdelhamid Bakhta <abdelhamid.bakhta@consensys.net>

* spotless

Signed-off-by: Abdelhamid Bakhta <abdelhamid.bakhta@consensys.net>
This commit is contained in:
Abdelhamid Bakhta
2020-02-03 20:27:24 +01:00
committed by GitHub
parent 7a7416f74e
commit d7d82e417d
3 changed files with 112 additions and 17 deletions

View File

@@ -34,12 +34,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -62,8 +61,7 @@ public class PendingTransactions {
comparing(TransactionInfo::isReceivedFromLocalSource)
.thenComparing(TransactionInfo::getSequence)
.reversed());
private final Map<Address, SortedMap<Long, TransactionInfo>> transactionsBySender =
new HashMap<>();
private final Map<Address, TransactionsForSenderInfo> transactionsBySender = new HashMap<>();
private final Subscribers<PendingTransactionListener> pendingTransactionSubscribers =
Subscribers.create();
@@ -206,7 +204,8 @@ public class PendingTransactions {
private AccountTransactionOrder createSenderTransactionOrder(final Address address) {
return new AccountTransactionOrder(
transactionsBySender.get(address).values().stream().map(TransactionInfo::getTransaction));
transactionsBySender.get(address).getTransactionsInfos().values().stream()
.map(TransactionInfo::getTransaction));
}
private boolean addTransaction(final TransactionInfo transactionInfo) {
@@ -247,27 +246,30 @@ public class PendingTransactions {
}
private void trackTransactionBySenderAndNonce(final TransactionInfo transactionInfo) {
final Map<Long, TransactionInfo> transactionsForSender =
transactionsBySender.computeIfAbsent(transactionInfo.getSender(), key -> new TreeMap<>());
transactionsForSender.put(transactionInfo.getNonce(), transactionInfo);
final TransactionsForSenderInfo transactionsForSenderInfo =
transactionsBySender.computeIfAbsent(
transactionInfo.getSender(), key -> new TransactionsForSenderInfo());
transactionsForSenderInfo.addTransactionToTrack(transactionInfo.getNonce(), transactionInfo);
}
private void removeTransactionTrackedBySenderAndNonce(final Transaction transaction) {
Optional.ofNullable(transactionsBySender.get(transaction.getSender()))
.ifPresent(
transactionsForSender -> {
transactionsForSender.remove(transaction.getNonce());
if (transactionsForSender.isEmpty()) {
transactionsForSender.getTransactionsInfos().remove(transaction.getNonce());
if (transactionsForSender.getTransactionsInfos().isEmpty()) {
transactionsBySender.remove(transaction.getSender());
transactionsForSender.updateGaps();
}
});
}
private TransactionInfo getTrackedTransactionBySenderAndNonce(
final TransactionInfo transactionInfo) {
final Map<Long, TransactionInfo> transactionsForSender =
transactionsBySender.computeIfAbsent(transactionInfo.getSender(), key -> new TreeMap<>());
return transactionsForSender.get(transactionInfo.getNonce());
final TransactionsForSenderInfo transactionsForSenderInfo =
transactionsBySender.computeIfAbsent(
transactionInfo.getSender(), key -> new TransactionsForSenderInfo());
return transactionsForSenderInfo.getTransactionsInfos().get(transactionInfo.getNonce());
}
private boolean shouldReplace(
@@ -326,12 +328,15 @@ public class PendingTransactions {
public OptionalLong getNextNonceForSender(final Address sender) {
synchronized (pendingTransactions) {
final SortedMap<Long, TransactionInfo> transactionsForSender =
transactionsBySender.get(sender);
if (transactionsForSender == null) {
final TransactionsForSenderInfo transactionsForSenderInfo = transactionsBySender.get(sender);
if (transactionsForSenderInfo == null
|| transactionsForSenderInfo.getTransactionsInfos().isEmpty()) {
return OptionalLong.empty();
} else if (!transactionsForSenderInfo.getGaps().isEmpty()) {
return OptionalLong.of(Objects.requireNonNull(transactionsForSenderInfo.getGaps().poll()));
} else {
return OptionalLong.of(transactionsForSenderInfo.getTransactionsInfos().lastKey() + 1);
}
return OptionalLong.of(transactionsForSender.lastKey() + 1);
}
}

View File

@@ -0,0 +1,67 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.LongStream;
class TransactionsForSenderInfo {
private final SortedMap<Long, PendingTransactions.TransactionInfo> transactionsInfos;
private final Queue<Long> gaps = new PriorityQueue<>();
TransactionsForSenderInfo() {
transactionsInfos = new TreeMap<>();
}
void addTransactionToTrack(
final long nonce, final PendingTransactions.TransactionInfo transactionInfo) {
if (!transactionsInfos.isEmpty()) {
updateGapsOnNewTransaction(nonce);
}
transactionsInfos.put(nonce, transactionInfo);
}
private void updateGapsOnNewTransaction(final long nonce) {
final long highestNonce = transactionsInfos.lastKey();
if (nonce > (highestNonce + 1)) {
LongStream.range(highestNonce + 1, nonce).forEach(gaps::add);
}
}
void updateGaps() {
synchronized (transactionsInfos) {
final Iterator<Long> nonceIterator = transactionsInfos.keySet().iterator();
long previousNonce = -1;
while (nonceIterator.hasNext()) {
final long currentNonce = nonceIterator.next();
LongStream.range(previousNonce + 1, currentNonce).forEach(gaps::add);
previousNonce = currentNonce;
}
}
}
SortedMap<Long, PendingTransactions.TransactionInfo> getTransactionsInfos() {
return transactionsInfos;
}
Queue<Long> getGaps() {
return gaps;
}
}

View File

@@ -619,4 +619,27 @@ public class PendingTransactionsTest {
assertThat(metricsSystem.getCounterValue(ADDED_COUNTER, LOCAL)).isEqualTo(0);
assertThat(metricsSystem.getCounterValue(ADDED_COUNTER, REMOTE)).isEqualTo(1);
}
@Test
public void assertThatCorrectNonceIsReturned() {
assertThat(transactions.getNextNonceForSender(transaction1.getSender())).isEmpty();
addLocalTransactions(1, 2, 4, 5);
assertThat(transactions.getNextNonceForSender(transaction1.getSender()))
.isPresent()
.hasValue(3);
addLocalTransactions(3);
assertThat(transactions.getNextNonceForSender(transaction1.getSender()))
.isPresent()
.hasValue(6);
addLocalTransactions(6, 10);
assertThat(transactions.getNextNonceForSender(transaction1.getSender()))
.isPresent()
.hasValue(7);
}
private void addLocalTransactions(final int... nonces) {
for (int nonce : nonces) {
transactions.addLocalTransaction(createTransaction(nonce));
}
}
}