QBFT migration: Create migrating mining coordinator (#3097)

Create MigratingMiningCoordinator, a schedulable mining coordinator that will switch between mining coordinators based on the consensus schedule.

https://github.com/hyperledger/besu/issues/2999

Signed-off-by: Simon Dudley <simon.dudley@consensys.net>
This commit is contained in:
Simon Dudley
2021-12-07 02:41:37 +01:00
committed by GitHub
parent 17f468edec
commit 6ad73e6375
5 changed files with 460 additions and 12 deletions

View File

@@ -18,9 +18,13 @@
package org.hyperledger.besu.controller;
import static org.hyperledger.besu.ethereum.core.BlockHeader.GENESIS_BLOCK_NUMBER;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.consensus.common.CombinedProtocolScheduleFactory;
import org.hyperledger.besu.consensus.common.ForkSpec;
import org.hyperledger.besu.consensus.common.ForksSchedule;
import org.hyperledger.besu.consensus.common.MigratingMiningCoordinator;
import org.hyperledger.besu.consensus.qbft.pki.PkiBlockCreationConfiguration;
import org.hyperledger.besu.crypto.NodeKey;
import org.hyperledger.besu.datatypes.Hash;
@@ -104,6 +108,7 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde
@Override
protected void prepForBuild() {
besuControllerBuilderSchedule.values().forEach(BesuControllerBuilder::prepForBuild);
super.prepForBuild();
}
@Override
@@ -114,15 +119,29 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde
final MiningParameters miningParameters,
final SyncState syncState,
final EthProtocolManager ethProtocolManager) {
return besuControllerBuilderSchedule
.get(0L)
.createMiningCoordinator(
protocolSchedule,
protocolContext,
transactionPool,
miningParameters,
syncState,
ethProtocolManager);
final List<ForkSpec<MiningCoordinator>> miningCoordinatorForkSpecs =
besuControllerBuilderSchedule.entrySet().stream()
.map(
e ->
new ForkSpec<>(
e.getKey(),
e.getValue()
.createMiningCoordinator(
protocolSchedule,
protocolContext,
transactionPool,
miningParameters,
syncState,
ethProtocolManager)))
.collect(Collectors.toList());
final ForksSchedule<MiningCoordinator> miningCoordinatorSchedule =
new ForksSchedule<>(
miningCoordinatorForkSpecs.get(0),
miningCoordinatorForkSpecs.subList(1, miningCoordinatorForkSpecs.size()));
return new MigratingMiningCoordinator(
miningCoordinatorSchedule, protocolContext.getBlockchain());
}
@Override
@@ -169,7 +188,7 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde
@Override
protected void validateContext(final ProtocolContext context) {
besuControllerBuilderSchedule.get(0L).validateContext(context);
besuControllerBuilderSchedule.get(GENESIS_BLOCK_NUMBER).validateContext(context);
}
@Override

View File

@@ -17,12 +17,25 @@
*/
package org.hyperledger.besu.controller;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.config.StubGenesisConfigOptions;
import org.hyperledger.besu.consensus.common.ForkSpec;
import org.hyperledger.besu.consensus.common.MigratingMiningCoordinator;
import org.hyperledger.besu.consensus.common.bft.blockcreation.BftMiningCoordinator;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator;
import org.hyperledger.besu.ethereum.blockcreation.NoopMiningCoordinator;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import java.math.BigInteger;
@@ -34,6 +47,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.BiFunction;
import org.assertj.core.api.SoftAssertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -52,6 +66,8 @@ public class ConsensusScheduleBesuControllerBuilderTest {
private @Mock ProtocolSchedule protocolSchedule1;
private @Mock ProtocolSchedule protocolSchedule2;
private @Mock ProtocolSchedule protocolSchedule3;
private @Mock NoopMiningCoordinator miningCoordinator1;
private @Mock BftMiningCoordinator miningCoordinator2;
@Test
public void mustProvideNonNullConsensusScheduleWhenInstantiatingNew() {
@@ -98,4 +114,52 @@ public class ConsensusScheduleBesuControllerBuilderTest {
Mockito.verify(combinedProtocolScheduleFactory)
.apply(expectedProtocolSchedulesSpecs, Optional.of(BigInteger.TEN));
}
@Test
public void createsScheduableMiningCoordinator() {
final Map<Long, BesuControllerBuilder> consensusSchedule =
Map.of(0L, besuControllerBuilder1, 5L, besuControllerBuilder2);
when(besuControllerBuilder1.createMiningCoordinator(any(), any(), any(), any(), any(), any()))
.thenReturn(miningCoordinator1);
when(besuControllerBuilder2.createMiningCoordinator(any(), any(), any(), any(), any(), any()))
.thenReturn(miningCoordinator2);
final ProtocolContext mockProtocolContext = mock(ProtocolContext.class);
when(mockProtocolContext.getBlockchain()).thenReturn(mock(MutableBlockchain.class));
final ConsensusScheduleBesuControllerBuilder builder =
new ConsensusScheduleBesuControllerBuilder(consensusSchedule);
final MiningCoordinator miningCoordinator =
builder.createMiningCoordinator(
protocolSchedule1,
mockProtocolContext,
mock(TransactionPool.class),
mock(MiningParameters.class),
mock(SyncState.class),
mock(EthProtocolManager.class));
assertThat(miningCoordinator).isInstanceOf(MigratingMiningCoordinator.class);
final MigratingMiningCoordinator migratingMiningCoordinator =
(MigratingMiningCoordinator) miningCoordinator;
SoftAssertions.assertSoftly(
(softly) -> {
softly
.assertThat(
migratingMiningCoordinator.getMiningCoordinatorSchedule().getFork(0L).getValue())
.isSameAs(miningCoordinator1);
softly
.assertThat(
migratingMiningCoordinator.getMiningCoordinatorSchedule().getFork(4L).getValue())
.isSameAs(miningCoordinator1);
softly
.assertThat(
migratingMiningCoordinator.getMiningCoordinatorSchedule().getFork(5L).getValue())
.isSameAs(miningCoordinator2);
softly
.assertThat(
migratingMiningCoordinator.getMiningCoordinatorSchedule().getFork(6L).getValue())
.isSameAs(miningCoordinator2);
});
}
}

View File

@@ -0,0 +1,138 @@
/*
* Copyright Hyperledger Besu contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.consensus.common;
import static org.apache.logging.log4j.LogManager.getLogger;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
import org.hyperledger.besu.ethereum.chain.BlockAddedObserver;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import java.util.List;
import java.util.Optional;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
public class MigratingMiningCoordinator implements MiningCoordinator, BlockAddedObserver {
private static final Logger LOG = getLogger();
private final ForksSchedule<MiningCoordinator> miningCoordinatorSchedule;
private final Blockchain blockchain;
private MiningCoordinator activeMiningCoordinator;
private long blockAddedObserverId;
public MigratingMiningCoordinator(
final ForksSchedule<MiningCoordinator> miningCoordinatorSchedule,
final Blockchain blockchain) {
this.miningCoordinatorSchedule = miningCoordinatorSchedule;
this.blockchain = blockchain;
this.activeMiningCoordinator =
this.miningCoordinatorSchedule.getFork(blockchain.getChainHeadBlockNumber()).getValue();
}
@Override
public void start() {
blockAddedObserverId = blockchain.observeBlockAdded(this);
activeMiningCoordinator.start();
}
@Override
public void stop() {
blockchain.removeObserver(blockAddedObserverId);
activeMiningCoordinator.stop();
}
@Override
public void awaitStop() throws InterruptedException {
activeMiningCoordinator.awaitStop();
}
@Override
public boolean enable() {
return activeMiningCoordinator.enable();
}
@Override
public boolean disable() {
return activeMiningCoordinator.disable();
}
@Override
public boolean isMining() {
return activeMiningCoordinator.isMining();
}
@Override
public Wei getMinTransactionGasPrice() {
return activeMiningCoordinator.getMinTransactionGasPrice();
}
@Override
public void setExtraData(final Bytes extraData) {
activeMiningCoordinator.setExtraData(extraData);
}
@Override
public Optional<Address> getCoinbase() {
return activeMiningCoordinator.getCoinbase();
}
@Override
public Optional<Block> createBlock(
final BlockHeader parentHeader,
final List<Transaction> transactions,
final List<BlockHeader> ommers) {
return activeMiningCoordinator.createBlock(parentHeader, transactions, ommers);
}
@Override
public void changeTargetGasLimit(final Long targetGasLimit) {
activeMiningCoordinator.changeTargetGasLimit(targetGasLimit);
}
@Override
public void onBlockAdded(final BlockAddedEvent event) {
final long currentBlock = event.getBlock().getHeader().getNumber();
final MiningCoordinator nextMiningCoordinator =
miningCoordinatorSchedule.getFork(currentBlock + 1).getValue();
if (activeMiningCoordinator != nextMiningCoordinator) {
LOG.trace(
"Switching mining coordinator at block {} from {} to {}",
currentBlock,
activeMiningCoordinator.getClass().getSimpleName(),
nextMiningCoordinator.getClass().getSimpleName());
activeMiningCoordinator.stop();
nextMiningCoordinator.start();
activeMiningCoordinator = nextMiningCoordinator;
}
if (activeMiningCoordinator instanceof BlockAddedObserver) {
((BlockAddedObserver) activeMiningCoordinator).onBlockAdded(event);
}
}
@VisibleForTesting
public ForksSchedule<MiningCoordinator> getMiningCoordinatorSchedule() {
return this.miningCoordinatorSchedule;
}
}

View File

@@ -0,0 +1,227 @@
/*
* Copyright Hyperledger Besu contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.consensus.common;
import static java.util.Collections.emptyList;
import static org.hyperledger.besu.ethereum.core.BlockHeader.GENESIS_BLOCK_NUMBER;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.consensus.common.bft.blockcreation.BftMiningCoordinator;
import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator;
import org.hyperledger.besu.ethereum.blockcreation.NoopMiningCoordinator;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import java.util.List;
import java.util.function.Consumer;
import org.apache.tuweni.bytes.Bytes;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class MigratingMiningCoordinatorTest {
@Mock private BftMiningCoordinator coordinator1;
@Mock private BftMiningCoordinator coordinator2;
@Mock private NoopMiningCoordinator noopCoordinator;
@Mock private Blockchain blockchain;
@Mock private BlockHeader blockHeader;
@Mock private BlockBody blockBody;
private Block block;
private BlockAddedEvent blockEvent;
private ForksSchedule<MiningCoordinator> miningCoordinatorSchedule;
private static final long MIGRATION_BLOCK_NUMBER = 5L;
@Before
public void setup() {
final ForkSpec<MiningCoordinator> genesisFork =
new ForkSpec<>(GENESIS_BLOCK_NUMBER, coordinator1);
final ForkSpec<MiningCoordinator> migrationFork =
new ForkSpec<>(MIGRATION_BLOCK_NUMBER, coordinator2);
miningCoordinatorSchedule = new ForksSchedule<>(genesisFork, List.of(migrationFork));
this.block = new Block(blockHeader, blockBody);
blockEvent = BlockAddedEvent.createForHeadAdvancement(this.block, emptyList(), emptyList());
}
@Test
public void startShouldRegisterThisCoordinatorAsObserver() {
final MigratingMiningCoordinator coordinator =
new MigratingMiningCoordinator(miningCoordinatorSchedule, blockchain);
coordinator.start();
verify(blockchain).observeBlockAdded(coordinator);
}
@Test
public void stopShouldUnregisterThisCoordinatorAsObserver() {
final MigratingMiningCoordinator coordinator =
new MigratingMiningCoordinator(miningCoordinatorSchedule, blockchain);
when(blockchain.observeBlockAdded(coordinator)).thenReturn(1L);
coordinator.start();
coordinator.stop();
verify(blockchain).removeObserver(1L);
}
@Test
public void onBlockAddedShouldMigrateToNextMiningCoordinatorAndDelegate() {
when(blockHeader.getNumber()).thenReturn(MIGRATION_BLOCK_NUMBER - 1);
new MigratingMiningCoordinator(miningCoordinatorSchedule, blockchain).onBlockAdded(blockEvent);
verify(coordinator1).stop();
verify(coordinator2).start();
verify(coordinator2).onBlockAdded(blockEvent);
}
@Test
public void onBlockAddedShouldNotDelegateWhenDelegateIsNoop() {
ForksSchedule<MiningCoordinator> noopCoordinatorSchedule =
new ForksSchedule<>(new ForkSpec<>(GENESIS_BLOCK_NUMBER, noopCoordinator), emptyList());
when(blockHeader.getNumber()).thenReturn(GENESIS_BLOCK_NUMBER);
new MigratingMiningCoordinator(noopCoordinatorSchedule, blockchain).onBlockAdded(blockEvent);
verifyNoInteractions(noopCoordinator);
}
@Test
public void delegatesToActiveMiningCoordinator() {
verifyDelegation(MiningCoordinator::start, GENESIS_BLOCK_NUMBER, coordinator1, coordinator2);
verifyDelegation(MiningCoordinator::start, MIGRATION_BLOCK_NUMBER, coordinator2, coordinator1);
verifyDelegation(MiningCoordinator::stop, GENESIS_BLOCK_NUMBER, coordinator1, coordinator2);
verifyDelegation(MiningCoordinator::stop, MIGRATION_BLOCK_NUMBER, coordinator2, coordinator1);
verifyDelegation(MiningCoordinator::enable, GENESIS_BLOCK_NUMBER, coordinator1, coordinator2);
verifyDelegation(MiningCoordinator::enable, MIGRATION_BLOCK_NUMBER, coordinator2, coordinator1);
verifyDelegation(MiningCoordinator::disable, GENESIS_BLOCK_NUMBER, coordinator1, coordinator2);
verifyDelegation(
MiningCoordinator::disable, MIGRATION_BLOCK_NUMBER, coordinator2, coordinator1);
verifyDelegation(MiningCoordinator::isMining, GENESIS_BLOCK_NUMBER, coordinator1, coordinator2);
verifyDelegation(
MiningCoordinator::isMining, MIGRATION_BLOCK_NUMBER, coordinator2, coordinator1);
verifyDelegation(
MiningCoordinator::getMinTransactionGasPrice,
GENESIS_BLOCK_NUMBER,
coordinator1,
coordinator2);
verifyDelegation(
MiningCoordinator::getMinTransactionGasPrice,
MIGRATION_BLOCK_NUMBER,
coordinator2,
coordinator1);
verifyDelegation(
c -> c.setExtraData(Bytes.EMPTY), GENESIS_BLOCK_NUMBER, coordinator1, coordinator2);
verifyDelegation(
c -> c.setExtraData(Bytes.EMPTY), MIGRATION_BLOCK_NUMBER, coordinator2, coordinator1);
verifyDelegation(
MiningCoordinator::getCoinbase, GENESIS_BLOCK_NUMBER, coordinator1, coordinator2);
verifyDelegation(
MiningCoordinator::getCoinbase, MIGRATION_BLOCK_NUMBER, coordinator2, coordinator1);
verifyDelegation(
c -> c.createBlock(blockHeader, emptyList(), emptyList()),
GENESIS_BLOCK_NUMBER,
coordinator1,
coordinator2);
verifyDelegation(
c -> c.createBlock(blockHeader, emptyList(), emptyList()),
MIGRATION_BLOCK_NUMBER,
coordinator2,
coordinator1);
verifyDelegation(
c -> c.changeTargetGasLimit(1L), GENESIS_BLOCK_NUMBER, coordinator1, coordinator2);
verifyDelegation(
c -> c.changeTargetGasLimit(1L), MIGRATION_BLOCK_NUMBER, coordinator2, coordinator1);
verifyDelegation(
c -> c.changeTargetGasLimit(1L), GENESIS_BLOCK_NUMBER, coordinator1, coordinator2);
verifyDelegation(
c -> c.changeTargetGasLimit(1L), MIGRATION_BLOCK_NUMBER, coordinator2, coordinator1);
}
private void verifyDelegation(
final Consumer<MiningCoordinator> methodUnderTest,
final long blockHeight,
final MiningCoordinator expectedActiveCoordinator,
final MiningCoordinator expectedInactiveCoordinator) {
when(blockchain.getChainHeadBlockNumber()).thenReturn(blockHeight);
methodUnderTest.accept(new MigratingMiningCoordinator(miningCoordinatorSchedule, blockchain));
methodUnderTest.accept(verify(expectedActiveCoordinator));
verifyNoInteractions(expectedInactiveCoordinator);
reset(coordinator1, coordinator2);
}
@Test
public void verifyDelegationForAwaitStop() throws InterruptedException {
verifyDelegationForAwaitStop(GENESIS_BLOCK_NUMBER, coordinator1, coordinator2);
verifyDelegationForAwaitStop(MIGRATION_BLOCK_NUMBER, coordinator2, coordinator1);
}
private void verifyDelegationForAwaitStop(
final long blockHeight,
final MiningCoordinator expectedActiveCoordinator,
final MiningCoordinator expectedInactiveCoordinator)
throws InterruptedException {
when(blockchain.getChainHeadBlockNumber()).thenReturn(blockHeight);
new MigratingMiningCoordinator(miningCoordinatorSchedule, blockchain).awaitStop();
verify(expectedActiveCoordinator).awaitStop();
verifyNoInteractions(expectedInactiveCoordinator);
reset(coordinator1, coordinator2);
}
@Test
public void verifyDelegationForOnBlockAdded() {
verifyDelegationForOnBlockAdded(GENESIS_BLOCK_NUMBER, coordinator1, coordinator2);
verifyDelegationForOnBlockAdded(MIGRATION_BLOCK_NUMBER, coordinator2, coordinator1);
}
private void verifyDelegationForOnBlockAdded(
final long blockHeight,
final BftMiningCoordinator expectedActiveCoordinator,
final BftMiningCoordinator expectedInactiveCoordinator) {
when(blockchain.getChainHeadBlockNumber()).thenReturn(blockHeight);
when(blockHeader.getNumber()).thenReturn(blockHeight);
new MigratingMiningCoordinator(miningCoordinatorSchedule, blockchain).onBlockAdded(blockEvent);
verify(expectedActiveCoordinator).onBlockAdded(blockEvent);
verifyNoInteractions(expectedInactiveCoordinator);
reset(coordinator1, coordinator2);
}
}

View File

@@ -37,8 +37,8 @@ import org.apache.logging.log4j.Logger;
* subscribers.unsubscribe(this::onSomeEvent);</code>
* </pre>
*
* <p>Since the two separate <code>this:onSomeEvent</code> are not equal, the subscriber wouldn't be
* removed. This bug is avoided by assigning each subscriber a unique ID and using that to
* <p>Since the two separate <code>this::onSomeEvent</code> are not equal, the subscriber wouldn't
* be removed. This bug is avoided by assigning each subscriber a unique ID and using that to
* unsubscribe.
*
* @param <T> the type of subscribers