mirror of
https://github.com/vacp2p/linea-besu.git
synced 2026-01-09 07:27:55 -05:00
consider peer reputation score when deciding to disconnect (#6187)
* don't disconnect if peer has enough of a score increase to have been useful * use threshold not increase Signed-off-by: Sally MacFarlane <macfarla.github@gmail.com> --------- Signed-off-by: Sally MacFarlane <macfarla.github@gmail.com>
This commit is contained in:
@@ -197,7 +197,7 @@ public class EthPeers {
|
||||
disconnectCallbacks.forEach(callback -> callback.onDisconnect(peer));
|
||||
peer.handleDisconnect();
|
||||
abortPendingRequestsAssignedToDisconnectedPeers();
|
||||
LOG.debug("Disconnected EthPeer {}", peer.getShortNodeId());
|
||||
LOG.debug("Disconnected EthPeer {}...", peer.getShortNodeId());
|
||||
LOG.trace("Disconnected EthPeer {}", peer);
|
||||
}
|
||||
}
|
||||
@@ -391,7 +391,7 @@ public class EthPeers {
|
||||
peer -> {
|
||||
LOG.atDebug()
|
||||
.setMessage(
|
||||
"disconnecting peer {}. Waiting for better peers. Current {} of max {}")
|
||||
"disconnecting peer {}... Waiting for better peers. Current {} of max {}")
|
||||
.addArgument(peer::getShortNodeId)
|
||||
.addArgument(this::peerCount)
|
||||
.addArgument(this::getMaxPeers)
|
||||
|
||||
@@ -398,7 +398,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
|
||||
"Disconnect - {} - {} - {}... - {} peers left\n{}",
|
||||
initiatedByPeer ? "Inbound" : "Outbound",
|
||||
reason,
|
||||
connection.getPeer().getId().slice(0, 16),
|
||||
connection.getPeer().getId().slice(0, 8),
|
||||
ethPeers.peerCount(),
|
||||
ethPeers);
|
||||
}
|
||||
|
||||
@@ -34,10 +34,13 @@ import org.slf4j.LoggerFactory;
|
||||
public class PeerReputation implements Comparable<PeerReputation> {
|
||||
static final long USELESS_RESPONSE_WINDOW_IN_MILLIS =
|
||||
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
|
||||
static final int DEFAULT_MAX_SCORE = 150;
|
||||
static final int DEFAULT_MAX_SCORE = 200;
|
||||
// how much above the initial score you need to be to not get disconnected for timeouts/useless
|
||||
// responses
|
||||
private final int hasBeenUsefulThreshold;
|
||||
static final int DEFAULT_INITIAL_SCORE = 100;
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PeerReputation.class);
|
||||
private static final int TIMEOUT_THRESHOLD = 3;
|
||||
private static final int TIMEOUT_THRESHOLD = 5;
|
||||
private static final int USELESS_RESPONSE_THRESHOLD = 5;
|
||||
|
||||
private final ConcurrentMap<Integer, AtomicInteger> timeoutCountByRequestType =
|
||||
@@ -45,8 +48,7 @@ public class PeerReputation implements Comparable<PeerReputation> {
|
||||
private final Queue<Long> uselessResponseTimes = new ConcurrentLinkedQueue<>();
|
||||
|
||||
private static final int SMALL_ADJUSTMENT = 1;
|
||||
private static final int LARGE_ADJUSTMENT = 10;
|
||||
|
||||
private static final int LARGE_ADJUSTMENT = 5;
|
||||
private int score;
|
||||
|
||||
private final int maxScore;
|
||||
@@ -59,22 +61,37 @@ public class PeerReputation implements Comparable<PeerReputation> {
|
||||
checkArgument(
|
||||
initialScore <= maxScore, "Initial score must be less than or equal to max score");
|
||||
this.maxScore = maxScore;
|
||||
this.hasBeenUsefulThreshold = Math.min(maxScore, initialScore + 10);
|
||||
this.score = initialScore;
|
||||
}
|
||||
|
||||
public Optional<DisconnectReason> recordRequestTimeout(final int requestCode) {
|
||||
final int newTimeoutCount = getOrCreateTimeoutCount(requestCode).incrementAndGet();
|
||||
if (newTimeoutCount >= TIMEOUT_THRESHOLD) {
|
||||
LOG.debug(
|
||||
"Disconnection triggered by {} repeated timeouts for requestCode {}",
|
||||
newTimeoutCount,
|
||||
requestCode);
|
||||
score -= LARGE_ADJUSTMENT;
|
||||
return Optional.of(DisconnectReason.TIMEOUT);
|
||||
// don't trigger disconnect if this peer has a sufficiently high reputation score
|
||||
if (peerHasNotBeenUseful()) {
|
||||
LOG.debug(
|
||||
"Disconnection triggered by {} repeated timeouts for requestCode {}, peer score {}",
|
||||
newTimeoutCount,
|
||||
requestCode,
|
||||
score);
|
||||
return Optional.of(DisconnectReason.TIMEOUT);
|
||||
}
|
||||
|
||||
LOG.trace(
|
||||
"Not triggering disconnect for {} repeated timeouts for requestCode {} because peer has high score {}",
|
||||
newTimeoutCount,
|
||||
requestCode,
|
||||
score);
|
||||
} else {
|
||||
score -= SMALL_ADJUSTMENT;
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
private boolean peerHasNotBeenUseful() {
|
||||
return score < hasBeenUsefulThreshold;
|
||||
}
|
||||
|
||||
public void resetTimeoutCount(final int requestCode) {
|
||||
@@ -96,12 +113,19 @@ public class PeerReputation implements Comparable<PeerReputation> {
|
||||
}
|
||||
if (uselessResponseTimes.size() >= USELESS_RESPONSE_THRESHOLD) {
|
||||
score -= LARGE_ADJUSTMENT;
|
||||
LOG.debug("Disconnection triggered by exceeding useless response threshold");
|
||||
return Optional.of(DisconnectReason.USELESS_PEER);
|
||||
// don't trigger disconnect if this peer has a sufficiently high reputation score
|
||||
if (peerHasNotBeenUseful()) {
|
||||
LOG.debug(
|
||||
"Disconnection triggered by exceeding useless response threshold, score {}", score);
|
||||
return Optional.of(DisconnectReason.USELESS_PEER);
|
||||
}
|
||||
LOG.trace(
|
||||
"Not triggering disconnect for exceeding useless response threshold because peer has high score {}",
|
||||
score);
|
||||
} else {
|
||||
score -= SMALL_ADJUSTMENT;
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public void recordUsefulResponse() {
|
||||
|
||||
@@ -36,6 +36,8 @@ public class PeerReputationTest {
|
||||
|
||||
@Test
|
||||
public void shouldOnlyDisconnectWhenTimeoutLimitReached() {
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).contains(TIMEOUT);
|
||||
@@ -45,6 +47,11 @@ public class PeerReputationTest {
|
||||
public void shouldTrackTimeoutsSeparatelyForDifferentRequestTypes() {
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
|
||||
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
|
||||
|
||||
@@ -57,6 +64,8 @@ public class PeerReputationTest {
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
|
||||
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
|
||||
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user