mirror of
https://github.com/vacp2p/status-linea-besu.git
synced 2026-01-09 22:07:59 -05:00
Disconnect worst peer (#6443)
* When refreshing, only disconnect a peer if we have max peers * If we are disconnecting a peer, disconnect the least useful peer Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net> Signed-off-by: Stefan Pingel <16143240+pinges@users.noreply.github.com> Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
This commit is contained in:
@@ -690,7 +690,7 @@ public class RunnerBuilder {
|
||||
.timestampForks(besuController.getGenesisConfigOptions().getForkBlockTimestamps())
|
||||
.allConnectionsSupplier(ethPeers::getAllConnections)
|
||||
.allActiveConnectionsSupplier(ethPeers::getAllActiveConnections)
|
||||
.peersLowerBound(ethPeers.getPeerLowerBound())
|
||||
.maxPeers(ethPeers.getMaxPeers())
|
||||
.build();
|
||||
};
|
||||
|
||||
|
||||
@@ -54,10 +54,14 @@ import org.slf4j.LoggerFactory;
|
||||
public class EthPeers {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(EthPeers.class);
|
||||
public static final Comparator<EthPeer> TOTAL_DIFFICULTY =
|
||||
Comparator.comparing(((final EthPeer p) -> p.chainState().getEstimatedTotalDifficulty()));
|
||||
Comparator.comparing((final EthPeer p) -> p.chainState().getEstimatedTotalDifficulty());
|
||||
|
||||
public static final Comparator<EthPeer> CHAIN_HEIGHT =
|
||||
Comparator.comparing(((final EthPeer p) -> p.chainState().getEstimatedHeight()));
|
||||
Comparator.comparing((final EthPeer p) -> p.chainState().getEstimatedHeight());
|
||||
|
||||
public static final Comparator<EthPeer> MOST_USEFUL_PEER =
|
||||
Comparator.comparing((final EthPeer p) -> p.getReputation().getScore())
|
||||
.thenComparing(CHAIN_HEIGHT);
|
||||
|
||||
public static final Comparator<EthPeer> HEAVIEST_CHAIN =
|
||||
TOTAL_DIFFICULTY.thenComparing(CHAIN_HEIGHT);
|
||||
|
||||
@@ -137,15 +137,14 @@ public abstract class AbstractRetryingSwitchingPeerTask<T> extends AbstractRetry
|
||||
// or the least useful
|
||||
|
||||
if (peers.peerCount() >= peers.getMaxPeers()) {
|
||||
failedPeers.stream()
|
||||
.filter(peer -> !peer.isDisconnected())
|
||||
.findAny()
|
||||
.or(() -> peers.streamAvailablePeers().min(peers.getBestChainComparator()))
|
||||
failedPeers.stream().filter(peer -> !peer.isDisconnected()).findAny().stream()
|
||||
.min(EthPeers.MOST_USEFUL_PEER)
|
||||
.or(() -> peers.streamAvailablePeers().min(EthPeers.MOST_USEFUL_PEER))
|
||||
.ifPresent(
|
||||
peer -> {
|
||||
LOG.atDebug()
|
||||
.setMessage(
|
||||
"Refresh peers disconnecting peer {}... Waiting for better peers. Current {} of max {}")
|
||||
"Refresh peers disconnecting peer {} Waiting for better peers. Current {} of max {}")
|
||||
.addArgument(peer::getLoggableId)
|
||||
.addArgument(peers::peerCount)
|
||||
.addArgument(peers::getMaxPeers)
|
||||
|
||||
@@ -189,10 +189,9 @@ public class DefaultP2PNetwork implements P2PNetwork {
|
||||
this.peerPermissions = peerPermissions;
|
||||
this.vertx = vertx;
|
||||
|
||||
// set the requirement here that the number of peers be greater than the lower bound
|
||||
final int peerLowerBound = rlpxAgent.getPeerLowerBound();
|
||||
LOG.debug("setting peerLowerBound {}", peerLowerBound);
|
||||
peerDiscoveryAgent.addPeerRequirement(() -> rlpxAgent.getConnectionCount() >= peerLowerBound);
|
||||
final int maxPeers = rlpxAgent.getMaxPeers();
|
||||
LOG.debug("setting maxPeers {}", maxPeers);
|
||||
peerDiscoveryAgent.addPeerRequirement(() -> rlpxAgent.getConnectionCount() >= maxPeers);
|
||||
subscribeDisconnect(reputationManager);
|
||||
}
|
||||
|
||||
@@ -512,7 +511,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
|
||||
private boolean legacyForkIdEnabled = false;
|
||||
private Supplier<Stream<PeerConnection>> allConnectionsSupplier;
|
||||
private Supplier<Stream<PeerConnection>> allActiveConnectionsSupplier;
|
||||
private int peersLowerBound;
|
||||
private int maxPeers;
|
||||
private PeerTable peerTable;
|
||||
|
||||
public P2PNetwork build() {
|
||||
@@ -593,7 +592,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
|
||||
.p2pTLSConfiguration(p2pTLSConfiguration)
|
||||
.allConnectionsSupplier(allConnectionsSupplier)
|
||||
.allActiveConnectionsSupplier(allActiveConnectionsSupplier)
|
||||
.peersLowerBound(peersLowerBound)
|
||||
.maxPeers(maxPeers)
|
||||
.peerTable(peerTable)
|
||||
.build();
|
||||
}
|
||||
@@ -710,8 +709,8 @@ public class DefaultP2PNetwork implements P2PNetwork {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder peersLowerBound(final int peersLowerBound) {
|
||||
this.peersLowerBound = peersLowerBound;
|
||||
public Builder maxPeers(final int maxPeers) {
|
||||
this.maxPeers = maxPeers;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ public class RlpxAgent {
|
||||
private final PeerPrivileges peerPrivileges;
|
||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
private final AtomicBoolean stopped = new AtomicBoolean(false);
|
||||
private final int lowerBound;
|
||||
private final int maxPeers;
|
||||
private final Supplier<Stream<PeerConnection>> allConnectionsSupplier;
|
||||
private final Supplier<Stream<PeerConnection>> allActiveConnectionsSupplier;
|
||||
private final Cache<Bytes, CompletableFuture<PeerConnection>> peersConnectingCache =
|
||||
@@ -87,7 +87,7 @@ public class RlpxAgent {
|
||||
final ConnectionInitializer connectionInitializer,
|
||||
final PeerRlpxPermissions peerPermissions,
|
||||
final PeerPrivileges peerPrivileges,
|
||||
final int peersLowerBound,
|
||||
final int maxPeers,
|
||||
final Supplier<Stream<PeerConnection>> allConnectionsSupplier,
|
||||
final Supplier<Stream<PeerConnection>> allActiveConnectionsSupplier) {
|
||||
this.localNode = localNode;
|
||||
@@ -95,7 +95,7 @@ public class RlpxAgent {
|
||||
this.connectionInitializer = connectionInitializer;
|
||||
this.peerPermissions = peerPermissions;
|
||||
this.peerPrivileges = peerPrivileges;
|
||||
this.lowerBound = peersLowerBound;
|
||||
this.maxPeers = maxPeers;
|
||||
this.allConnectionsSupplier = allConnectionsSupplier;
|
||||
this.allActiveConnectionsSupplier = allActiveConnectionsSupplier;
|
||||
}
|
||||
@@ -358,8 +358,8 @@ public class RlpxAgent {
|
||||
return peersConnectingCache.asMap();
|
||||
}
|
||||
|
||||
public int getPeerLowerBound() {
|
||||
return lowerBound;
|
||||
public int getMaxPeers() {
|
||||
return maxPeers;
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
@@ -374,7 +374,7 @@ public class RlpxAgent {
|
||||
private Optional<TLSConfiguration> p2pTLSConfiguration;
|
||||
private Supplier<Stream<PeerConnection>> allConnectionsSupplier;
|
||||
private Supplier<Stream<PeerConnection>> allActiveConnectionsSupplier;
|
||||
private int peersLowerBound;
|
||||
private int maxPeers;
|
||||
private PeerTable peerTable;
|
||||
|
||||
private Builder() {}
|
||||
@@ -413,7 +413,7 @@ public class RlpxAgent {
|
||||
connectionInitializer,
|
||||
rlpxPermissions,
|
||||
peerPrivileges,
|
||||
peersLowerBound,
|
||||
maxPeers,
|
||||
allConnectionsSupplier,
|
||||
allActiveConnectionsSupplier);
|
||||
}
|
||||
@@ -492,8 +492,8 @@ public class RlpxAgent {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder peersLowerBound(final int peersLowerBound) {
|
||||
this.peersLowerBound = peersLowerBound;
|
||||
public Builder maxPeers(final int maxPeers) {
|
||||
this.maxPeers = maxPeers;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user