Cleanup: Synchronizer is always present in protocol context (#7791)

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
This commit is contained in:
Fabio Di Fabio
2024-10-24 10:36:21 +02:00
committed by GitHub
parent 71906fa9d4
commit 516559fadc
12 changed files with 45 additions and 29 deletions

View File

@@ -717,7 +717,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
ethPeers.snapServerPeersNeeded(false);
}
protocolContext.setSynchronizer(Optional.of(synchronizer));
protocolContext.setSynchronizer(synchronizer);
final Optional<SnapProtocolManager> maybeSnapProtocolManager =
createSnapProtocolManager(

View File

@@ -59,6 +59,7 @@ import org.hyperledger.besu.ethereum.core.ImmutableMiningParameters;
import org.hyperledger.besu.ethereum.core.ImmutableMiningParameters.MutableInitValues;
import org.hyperledger.besu.ethereum.core.ImmutableMiningParameters.Unstable;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.core.TransactionTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
@@ -189,6 +190,7 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
protocolContext =
new ProtocolContext(blockchain, worldStateArchive, mergeContext, badBlockManager);
protocolContext.setSynchronizer(mock(Synchronizer.class));
var mutable = worldStateArchive.getMutable();
genesisState.writeStateTo(mutable);
mutable.persist(null);

View File

@@ -181,16 +181,7 @@ public class MainnetBlockValidator implements BlockValidator {
Optional.of(new BlockProcessingOutputs(worldState, receipts, maybeRequests)));
}
} catch (MerkleTrieException ex) {
context
.getSynchronizer()
.ifPresentOrElse(
synchronizer -> synchronizer.healWorldState(ex.getMaybeAddress(), ex.getLocation()),
() ->
handleFailedBlockProcessing(
block,
new BlockProcessingResult(Optional.empty(), ex),
// Do not record bad black due to missing data
false));
context.getSynchronizer().healWorldState(ex.getMaybeAddress(), ex.getLocation());
return new BlockProcessingResult(Optional.empty(), ex);
} catch (StorageException ex) {
var retval = new BlockProcessingResult(Optional.empty(), ex);

View File

@@ -32,8 +32,7 @@ public class ProtocolContext {
private final WorldStateArchive worldStateArchive;
private final BadBlockManager badBlockManager;
private final ConsensusContext consensusContext;
private Optional<Synchronizer> synchronizer;
private Synchronizer synchronizer;
/**
* Constructs a new ProtocolContext with the given blockchain, world state archive, consensus
@@ -52,7 +51,6 @@ public class ProtocolContext {
this.blockchain = blockchain;
this.worldStateArchive = worldStateArchive;
this.consensusContext = consensusContext;
this.synchronizer = Optional.empty();
this.badBlockManager = badBlockManager;
}
@@ -85,7 +83,7 @@ public class ProtocolContext {
*
* @return the synchronizer of the protocol context
*/
public Optional<Synchronizer> getSynchronizer() {
public Synchronizer getSynchronizer() {
return synchronizer;
}
@@ -94,7 +92,7 @@ public class ProtocolContext {
*
* @param synchronizer the synchronizer to set
*/
public void setSynchronizer(final Optional<Synchronizer> synchronizer) {
public void setSynchronizer(final Synchronizer synchronizer) {
this.synchronizer = synchronizer;
}

View File

@@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.core;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents;
import org.hyperledger.besu.plugin.services.BesuEvents.InitialSyncCompletionListener;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -82,6 +83,22 @@ public interface Synchronizer {
*/
boolean unsubscribeInSync(final long listenerId);
/**
* Add a listener that will be notified when this node initial sync status changes.
*
* @param listener The callback to invoke when the initial sync status changes
* @return A subscription id that can be used to unsubscribe from these events
*/
long subscribeInitialSync(final InitialSyncCompletionListener listener);
/**
* Unsubscribe from initial sync events.
*
* @param listenerId The id returned when subscribing
* @return {@code true} if a subscription was cancelled
*/
boolean unsubscribeInitialSync(final long listenerId);
@FunctionalInterface
interface InSyncListener {
void onInSyncStatusChange(boolean newSyncStatus);

View File

@@ -194,6 +194,7 @@ public class BlockchainSetupUtil {
genesisState.writeStateTo(worldArchive.getMutable());
final ProtocolContext protocolContext = protocolContextProvider.get(blockchain, worldArchive);
protocolContext.setSynchronizer(new DummySynchronizer());
final Path blocksPath = Path.of(chainResources.getBlocksURL().toURI());
final List<Block> blocks = new ArrayList<>();

View File

@@ -12,10 +12,9 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.retesteth;
package org.hyperledger.besu.ethereum.core;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents;
@@ -81,4 +80,14 @@ public class DummySynchronizer implements Synchronizer {
public boolean unsubscribeInSync(final long listenerId) {
return false;
}
@Override
public long subscribeInitialSync(final BesuEvents.InitialSyncCompletionListener listener) {
return 0;
}
@Override
public boolean unsubscribeInitialSync(final long listenerId) {
return false;
}
}

View File

@@ -31,6 +31,7 @@ import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.mainnet.BlockBodyValidator;
import org.hyperledger.besu.ethereum.mainnet.BlockHeaderValidator;
import org.hyperledger.besu.ethereum.mainnet.BlockProcessor;
@@ -90,6 +91,7 @@ public class MainnetBlockValidatorTest {
when(protocolContext.getBlockchain()).thenReturn(blockchain);
when(protocolContext.getWorldStateArchive()).thenReturn(worldStateArchive);
when(protocolContext.getSynchronizer()).thenReturn(mock(Synchronizer.class));
when(worldStateArchive.getMutable(any(BlockHeader.class), anyBoolean()))
.thenReturn(Optional.of(worldState));
when(worldStateArchive.getMutable(any(Hash.class), any(Hash.class)))

View File

@@ -26,7 +26,6 @@ import org.hyperledger.besu.ethereum.eth.messages.snap.GetTrieNodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.TrieNodesMessage;
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncConfiguration;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider;
@@ -49,7 +48,6 @@ import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -84,7 +82,6 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener {
static final Hash HASH_LAST = Hash.wrap(Bytes32.leftPad(Bytes.fromHexString("FF"), (byte) 0xFF));
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final AtomicLong listenerId = new AtomicLong();
private final EthMessages snapMessages;
private final WorldStateStorageCoordinator worldStateStorageCoordinator;
@@ -111,14 +108,9 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener {
this.protocolContext = Optional.of(protocolContext);
registerResponseConstructors();
// subscribe to initial sync completed events to start/stop snap server:
this.protocolContext
.flatMap(ProtocolContext::getSynchronizer)
.filter(z -> z instanceof DefaultSynchronizer)
.map(DefaultSynchronizer.class::cast)
.ifPresentOrElse(
z -> this.listenerId.set(z.subscribeInitialSync(this)),
() -> LOGGER.warn("SnapServer created without reference to sync status"));
// subscribe to initial sync completed events to start/stop snap server,
// not saving the listenerId since we never need to unsubscribe.
protocolContext.getSynchronizer().subscribeInitialSync(this);
}
/**

View File

@@ -388,10 +388,12 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
return syncState.unsubscribeSyncStatus(listenerId);
}
@Override
public long subscribeInitialSync(final BesuEvents.InitialSyncCompletionListener listener) {
return syncState.subscribeCompletionReached(listener);
}
@Override
public boolean unsubscribeInitialSync(final long listenerId) {
return syncState.unsubscribeInitialConditionReached(listenerId);
}

View File

@@ -35,6 +35,7 @@ dependencies {
implementation project(':ethereum:api')
implementation project(':ethereum:blockcreation')
implementation project(':ethereum:core')
implementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts')
implementation project(':ethereum:eth')
implementation project(':ethereum:p2p')
implementation project(':ethereum:rlp')

View File

@@ -30,6 +30,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.EthSendRawTran
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.Web3ClientVersion;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.BlockResultFactory;
import org.hyperledger.besu.ethereum.core.DummySynchronizer;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.retesteth.methods.TestGetLogHash;
import org.hyperledger.besu.ethereum.retesteth.methods.TestImportRawBlock;