Cancel older block creation tasks upon receiving a new one (#5263)

* Cancel older block creation tasks upon receiving a new one

Signed-off-by: Simon Dudley <simon.dudley@consensys.net>

* Use only one thread for building blocks so there is no risk of overlapping

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Fix tests that fail when debug log is enabled

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

---------

Signed-off-by: Simon Dudley <simon.dudley@consensys.net>
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
Co-authored-by: Simon Dudley <simon.dudley@consensys.net>
This commit is contained in:
Fabio Di Fabio
2023-03-28 21:34:07 +02:00
committed by GitHub
parent 3aef587e78
commit 323e1c1a13
5 changed files with 83 additions and 42 deletions

View File

@@ -161,7 +161,7 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
this.syncState.set(syncState);
final ExecutorService blockBuilderExecutor =
MonitoredExecutors.newCachedThreadPool("PoS-Block-Builder", 1, metricsSystem);
MonitoredExecutors.newSingleThreadExecutor("PoS-Block-Builder", metricsSystem);
return new MergeCoordinator(
protocolContext,

View File

@@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.consensus.merge.blockcreation;
import static java.util.stream.Collectors.joining;
import static org.hyperledger.besu.consensus.merge.TransitionUtils.isTerminalProofOfWorkBlock;
import static org.hyperledger.besu.consensus.merge.blockcreation.MergeMiningCoordinator.ForkchoiceResult.Status.INVALID;
@@ -59,6 +60,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
@@ -257,6 +259,8 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
payloadIdentifier);
return payloadIdentifier;
}
// it's a new payloadId so...
cancelAnyExistingBlockCreationTasks(payloadIdentifier);
final MergeBlockCreator mergeBlockCreator =
this.mergeBlockCreatorFactory.forParams(parentHeader, Optional.ofNullable(feeRecipient));
@@ -293,6 +297,20 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
return payloadIdentifier;
}
private void cancelAnyExistingBlockCreationTasks(final PayloadIdentifier payloadIdentifier) {
if (blockCreationTask.size() > 0) {
String existingPayloadIdsBeingBuilt =
blockCreationTask.keySet().stream()
.map(PayloadIdentifier::toHexString)
.collect(joining(","));
LOG.warn(
"New payloadId {} received so cancelling block creation tasks for the following payloadIds: {}",
payloadIdentifier,
existingPayloadIdsBeingBuilt);
blockCreationTask.values().forEach(BlockCreationTask::cancel);
}
}
@Override
public void finalizeProposalById(final PayloadIdentifier payloadId) {
LOG.debug("Finalizing block proposal for payload id {}", payloadId);
@@ -351,11 +369,10 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
recoverableBlockCreation(payloadIdentifier, blockCreator, lastStartAt);
final long lastDuration = System.currentTimeMillis() - lastStartAt;
final long waitBeforeRepetition =
miningParameters.getPosBlockCreationRepetitionMinDuration() - lastDuration;
if (waitBeforeRepetition > 0) {
LOG.debug("Waiting {}ms before repeating block creation", waitBeforeRepetition);
Thread.sleep(waitBeforeRepetition);
}
Math.max(
100, miningParameters.getPosBlockCreationRepetitionMinDuration() - lastDuration);
LOG.debug("Waiting {}ms before repeating block creation", waitBeforeRepetition);
Thread.sleep(waitBeforeRepetition);
} catch (final CancellationException | InterruptedException ce) {
LOG.atDebug()
.setMessage("Block creation for payload id {} has been cancelled, reason {}")
@@ -911,7 +928,8 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
return sw.toString();
}
private boolean isBlockCreationCancelled(final PayloadIdentifier payloadId) {
@VisibleForTesting
boolean isBlockCreationCancelled(final PayloadIdentifier payloadId) {
final BlockCreationTask job = blockCreationTask.get(payloadId);
if (job == null) {
return true;

View File

@@ -15,6 +15,8 @@
package org.hyperledger.besu.consensus.merge;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -23,6 +25,7 @@ import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.consensus.merge.blockcreation.PayloadIdentifier;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
@@ -33,6 +36,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -131,7 +135,7 @@ public class PostMergeContextTest {
@Test
public void putAndRetrieveFirstPayload() {
BlockWithReceipts mockBlockWithReceipts = mock(BlockWithReceipts.class);
BlockWithReceipts mockBlockWithReceipts = createBlockWithReceipts(1, 21000, 1);
PayloadIdentifier firstPayloadId = new PayloadIdentifier(1L);
postMergeContext.putPayloadById(firstPayloadId, mockBlockWithReceipts);
@@ -141,19 +145,8 @@ public class PostMergeContextTest {
@Test
public void puttingTwoBlocksWithTheSamePayloadIdWeRetrieveTheBest() {
BlockHeader zeroTxBlockHeader = mock(BlockHeader.class);
when(zeroTxBlockHeader.getGasUsed()).thenReturn(0L);
Block zeroTxBlock = mock(Block.class);
when(zeroTxBlock.getHeader()).thenReturn(zeroTxBlockHeader);
BlockWithReceipts zeroTxBlockWithReceipts = mock(BlockWithReceipts.class);
when(zeroTxBlockWithReceipts.getBlock()).thenReturn(zeroTxBlock);
BlockHeader betterBlockHeader = mock(BlockHeader.class);
when(betterBlockHeader.getGasUsed()).thenReturn(11L);
Block betterBlock = mock(Block.class);
when(betterBlock.getHeader()).thenReturn(betterBlockHeader);
BlockWithReceipts betterBlockWithReceipts = mock(BlockWithReceipts.class);
when(betterBlockWithReceipts.getBlock()).thenReturn(betterBlock);
BlockWithReceipts zeroTxBlockWithReceipts = createBlockWithReceipts(1, 0, 0);
BlockWithReceipts betterBlockWithReceipts = createBlockWithReceipts(2, 11, 1);
PayloadIdentifier payloadId = new PayloadIdentifier(1L);
postMergeContext.putPayloadById(payloadId, zeroTxBlockWithReceipts);
@@ -164,26 +157,9 @@ public class PostMergeContextTest {
@Test
public void puttingABlockWithTheSamePayloadIdSmallerThanAnExistingOneWeRetrieveTheBest() {
BlockHeader zeroTxBlockHeader = mock(BlockHeader.class);
when(zeroTxBlockHeader.getGasUsed()).thenReturn(0L);
Block zeroTxBlock = mock(Block.class);
when(zeroTxBlock.getHeader()).thenReturn(zeroTxBlockHeader);
BlockWithReceipts zeroTxBlockWithReceipts = mock(BlockWithReceipts.class);
when(zeroTxBlockWithReceipts.getBlock()).thenReturn(zeroTxBlock);
BlockHeader betterBlockHeader = mock(BlockHeader.class);
when(betterBlockHeader.getGasUsed()).thenReturn(11L);
Block betterBlock = mock(Block.class);
when(betterBlock.getHeader()).thenReturn(betterBlockHeader);
BlockWithReceipts betterBlockWithReceipts = mock(BlockWithReceipts.class);
when(betterBlockWithReceipts.getBlock()).thenReturn(betterBlock);
BlockHeader smallBlockHeader = mock(BlockHeader.class);
when(smallBlockHeader.getGasUsed()).thenReturn(5L);
Block smallBlock = mock(Block.class);
when(smallBlock.getHeader()).thenReturn(smallBlockHeader);
BlockWithReceipts smallBlockWithReceipts = mock(BlockWithReceipts.class);
when(smallBlockWithReceipts.getBlock()).thenReturn(smallBlock);
BlockWithReceipts zeroTxBlockWithReceipts = createBlockWithReceipts(1, 0, 0);
BlockWithReceipts betterBlockWithReceipts = createBlockWithReceipts(2, 11, 1);
BlockWithReceipts smallBlockWithReceipts = createBlockWithReceipts(3, 5, 1);
PayloadIdentifier payloadId = new PayloadIdentifier(1L);
postMergeContext.putPayloadById(payloadId, zeroTxBlockWithReceipts);
@@ -204,7 +180,7 @@ public class PostMergeContextTest {
public void tryingToRetrieveABlockPutButEvictedReturnsEmpty() {
for (long i = 0; i < PostMergeContext.MAX_BLOCKS_IN_PROGRESS + 1; i++) {
PayloadIdentifier payloadId = new PayloadIdentifier(i);
BlockWithReceipts mockBlockWithReceipts = mock(BlockWithReceipts.class);
BlockWithReceipts mockBlockWithReceipts = createBlockWithReceipts((int) i + 1, 11, 1);
postMergeContext.putPayloadById(payloadId, mockBlockWithReceipts);
}
@@ -239,6 +215,20 @@ public class PostMergeContextTest {
assertThat(postMergeContext.isSyncing()).isFalse();
}
private static BlockWithReceipts createBlockWithReceipts(
final int number, final long gasUsed, final int txCount) {
Block mockBlock = mock(Block.class, RETURNS_DEEP_STUBS);
// using lenient here, since some code is only executed when debug log is enabled
lenient()
.when(mockBlock.toLogString())
.thenReturn(number + " (" + Hash.wrap(Bytes32.random()) + ")");
when(mockBlock.getHeader().getGasUsed()).thenReturn(gasUsed);
when(mockBlock.getBody().getTransactions().size()).thenReturn(txCount);
BlockWithReceipts mockBlockWithReceipts = mock(BlockWithReceipts.class);
when(mockBlockWithReceipts.getBlock()).thenReturn(mockBlock);
return mockBlockWithReceipts;
}
private static class MergeStateChangeCollector implements MergeStateHandler {
final List<Boolean> stateChanges = new ArrayList<>();

View File

@@ -615,6 +615,34 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
}
}
@Test
public void shouldCancelPreviousBlockCreationJobIfCalledAgainWithNewPayloadId() {
final long timestamp = System.currentTimeMillis() / 1000;
var payloadId1 =
coordinator.preparePayload(
genesisState.getBlock().getHeader(),
timestamp,
Bytes32.ZERO,
suggestedFeeRecipient,
Optional.empty());
assertThat(coordinator.isBlockCreationCancelled(payloadId1)).isFalse();
var payloadId2 =
coordinator.preparePayload(
genesisState.getBlock().getHeader(),
timestamp + 1,
Bytes32.ZERO,
suggestedFeeRecipient,
Optional.empty());
assertThat(payloadId1).isNotEqualTo(payloadId2);
assertThat(coordinator.isBlockCreationCancelled(payloadId1)).isTrue();
assertThat(coordinator.isBlockCreationCancelled(payloadId2)).isFalse();
}
@Test
public void childTimestampExceedsParentsFails() {
BlockHeader terminalHeader = terminalPowBlock();

View File

@@ -118,6 +118,11 @@ public class MonitoredExecutors {
new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, rejectedExecutionHandler));
}
public static ExecutorService newSingleThreadExecutor(
final String name, final MetricsSystem metricsSystem) {
return newFixedThreadPool(name, 1, 1, metricsSystem);
}
private static <T extends ThreadPoolExecutor> T newMonitoredExecutor(
final String name,
final MetricsSystem metricsSystem,