allow disabling of transaction indexing while syncing (#8181)

Signed-off-by: Jason Frame <jason.frame@consensys.net>
This commit is contained in:
Jason Frame
2025-02-13 16:45:08 +10:00
committed by GitHub
parent 6342f5d678
commit c34e3967fd
11 changed files with 138 additions and 21 deletions

View File

@@ -2,6 +2,7 @@
## Unreleased
### Breaking Changes
### Upcoming Breaking Changes
- `MetricSystem::createLabelledGauge` is deprecated and will be removed in a future release, replace it with `MetricSystem::createLabelledSuppliedGauge`
- k8s (KUBERNETES) Nat method is now deprecated and will be removed in a future release. Use docker or none instead.
@@ -14,6 +15,7 @@
- Smart-contract-based (onchain) permissioning
- Proof of Work consensus
- Fast Sync
- Transaction indexing will be disabled by default in a future release for snap sync and checkpoint sync modes. This will break RPCs that use transaction hash for historical queries.
### Additions and Improvements
- Add TLS/mTLS options and configure the GraphQL HTTP service[#7910](https://github.com/hyperledger/besu/pull/7910)
- Allow plugins to propose transactions during block creation [#8268](https://github.com/hyperledger/besu/pull/8268)

View File

@@ -72,6 +72,8 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
"--Xsnapsync-synchronizer-bytecode-count-per-request";
private static final String SNAP_TRIENODE_COUNT_PER_REQUEST_FLAG =
"--Xsnapsync-synchronizer-trienode-count-per-request";
private static final String SNAP_TRANSACTION_INDEXING_ENABLED_FLAG =
"--Xsnapsync-synchronizer-transaction-indexing-enabled";
private static final String SNAP_FLAT_ACCOUNT_HEALED_COUNT_PER_REQUEST_FLAG =
"--Xsnapsync-synchronizer-flat-account-healed-count-per-request";
@@ -319,6 +321,15 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
"Temporary feature toggle to enable using the new peertask system (default: ${DEFAULT-VALUE})")
private final Boolean isPeerTaskSystemEnabled = false;
@CommandLine.Option(
names = SNAP_TRANSACTION_INDEXING_ENABLED_FLAG,
hidden = true,
paramLabel = "<Boolean>",
arity = "0..1",
description = "Enable transaction indexing during snap sync. (default: ${DEFAULT-VALUE})")
private Boolean snapTransactionIndexingEnabled =
SnapSyncConfiguration.DEFAULT_SNAP_SYNC_TRANSACTION_INDEXING_ENABLED;
private SynchronizerOptions() {}
/**
@@ -399,6 +410,8 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
options.checkpointPostMergeSyncEnabled = config.isCheckpointPostMergeEnabled();
options.snapsyncServerEnabled = config.getSnapSyncConfiguration().isSnapServerEnabled();
options.snapsyncBftEnabled = config.getSnapSyncConfiguration().isSnapSyncBftEnabled();
options.snapTransactionIndexingEnabled =
config.getSnapSyncConfiguration().isSnapSyncTransactionIndexingEnabled();
return options;
}
@@ -432,6 +445,7 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
.localFlatStorageCountToHealPerRequest(snapsyncFlatStorageHealedCountPerRequest)
.isSnapServerEnabled(snapsyncServerEnabled)
.isSnapSyncBftEnabled(snapsyncBftEnabled)
.isSnapSyncTransactionIndexingEnabled(snapTransactionIndexingEnabled)
.build());
builder.checkpointPostMergeEnabled(checkpointPostMergeSyncEnabled);
builder.isPeerTaskSystemEnabled(isPeerTaskSystemEnabled);
@@ -491,7 +505,9 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
SNAP_SERVER_ENABLED_FLAG,
OptionParser.format(snapsyncServerEnabled),
SNAP_SYNC_BFT_ENABLED_FLAG,
OptionParser.format(snapsyncBftEnabled));
OptionParser.format(snapsyncBftEnabled),
SNAP_TRANSACTION_INDEXING_ENABLED_FLAG,
OptionParser.format(snapTransactionIndexingEnabled));
return value;
}
}

View File

@@ -78,6 +78,7 @@ public class SynchronizerOptionsTest
.bytecodeCountPerRequest(
SnapSyncConfiguration.DEFAULT_BYTECODE_COUNT_PER_REQUEST + 2)
.isSnapServerEnabled(Boolean.TRUE)
.isSnapSyncTransactionIndexingEnabled(Boolean.TRUE)
.build());
}

View File

@@ -417,13 +417,20 @@ public class DefaultBlockchain implements MutableBlockchain {
@Override
public synchronized void appendBlock(final Block block, final List<TransactionReceipt> receipts) {
if (numberOfBlocksToCache != 0) cacheBlockData(block, receipts);
appendBlockHelper(new BlockWithReceipts(block, receipts), false);
appendBlockHelper(new BlockWithReceipts(block, receipts), false, true);
}
@Override
public synchronized void appendBlockWithoutIndexingTransactions(
final Block block, final List<TransactionReceipt> receipts) {
if (numberOfBlocksToCache != 0) cacheBlockData(block, receipts);
appendBlockHelper(new BlockWithReceipts(block, receipts), false, false);
}
@Override
public synchronized void storeBlock(final Block block, final List<TransactionReceipt> receipts) {
if (numberOfBlocksToCache != 0) cacheBlockData(block, receipts);
appendBlockHelper(new BlockWithReceipts(block, receipts), true);
appendBlockHelper(new BlockWithReceipts(block, receipts), true, true);
}
private void cacheBlockData(final Block block, final List<TransactionReceipt> receipts) {
@@ -447,7 +454,9 @@ public class DefaultBlockchain implements MutableBlockchain {
}
private void appendBlockHelper(
final BlockWithReceipts blockWithReceipts, final boolean storeOnly) {
final BlockWithReceipts blockWithReceipts,
final boolean storeOnly,
final boolean transactionIndexing) {
if (!blockShouldBeProcessed(blockWithReceipts.getBlock(), blockWithReceipts.getReceipts())) {
return;
@@ -469,7 +478,7 @@ public class DefaultBlockchain implements MutableBlockchain {
if (storeOnly) {
blockAddedEvent = handleStoreOnly(blockWithReceipts);
} else {
blockAddedEvent = updateCanonicalChainData(updater, blockWithReceipts);
blockAddedEvent = updateCanonicalChainData(updater, blockWithReceipts, transactionIndexing);
if (blockAddedEvent.isNewCanonicalHead()) {
updateCacheForNewCanonicalHead(block, td);
}
@@ -521,7 +530,9 @@ public class DefaultBlockchain implements MutableBlockchain {
}
private BlockAddedEvent updateCanonicalChainData(
final BlockchainStorage.Updater updater, final BlockWithReceipts blockWithReceipts) {
final Updater updater,
final BlockWithReceipts blockWithReceipts,
final boolean transactionIndexing) {
final Block newBlock = blockWithReceipts.getBlock();
final Hash chainHead = blockchainStorage.getChainHead().orElse(null);
@@ -532,7 +543,7 @@ public class DefaultBlockchain implements MutableBlockchain {
try {
if (newBlock.getHeader().getParentHash().equals(chainHead) || chainHead == null) {
return handleNewHead(updater, blockWithReceipts);
return handleNewHead(updater, blockWithReceipts, transactionIndexing);
} else if (blockChoiceRule.compare(newBlock.getHeader(), chainHeader) > 0) {
// New block represents a chain reorganization
return handleChainReorg(updater, blockWithReceipts);
@@ -553,14 +564,18 @@ public class DefaultBlockchain implements MutableBlockchain {
}
private BlockAddedEvent handleNewHead(
final Updater updater, final BlockWithReceipts blockWithReceipts) {
final Updater updater,
final BlockWithReceipts blockWithReceipts,
final boolean transactionIndexing) {
// This block advances the chain, update the chain head
final Hash newBlockHash = blockWithReceipts.getHash();
updater.putBlockHash(blockWithReceipts.getNumber(), newBlockHash);
updater.setChainHead(newBlockHash);
indexTransactionsForBlock(
updater, newBlockHash, blockWithReceipts.getBlock().getBody().getTransactions());
if (transactionIndexing) {
indexTransactionsForBlock(
updater, newBlockHash, blockWithReceipts.getBlock().getBody().getTransactions());
}
gasUsedCounter.inc(blockWithReceipts.getHeader().getGasUsed());
numberOfTransactionsCounter.inc(
blockWithReceipts.getBlock().getBody().getTransactions().size());
@@ -745,7 +760,7 @@ public class DefaultBlockchain implements MutableBlockchain {
try {
final BlockWithReceipts blockWithReceipts = getBlockWithReceipts(blockHeader).get();
BlockAddedEvent newHeadEvent = handleNewHead(updater, blockWithReceipts);
BlockAddedEvent newHeadEvent = handleNewHead(updater, blockWithReceipts, true);
updateCacheForNewCanonicalHead(
blockWithReceipts.getBlock(), calculateTotalDifficulty(blockHeader));
updater.commit();

View File

@@ -37,6 +37,18 @@ public interface MutableBlockchain extends Blockchain {
*/
void appendBlock(Block block, List<TransactionReceipt> receipts);
/**
* Adds a block to the blockchain without indexing transactions.
*
* <p>Block must be connected to the existing blockchain (its parent must already be stored),
* otherwise an {@link IllegalArgumentException} is thrown. Blocks representing forks are allowed
* as long as they are connected.
*
* @param block The block to append.
* @param receipts The list of receipts associated with this block's transactions.
*/
void appendBlockWithoutIndexingTransactions(Block block, List<TransactionReceipt> receipts);
/**
* Adds a block to the blockchain, without updating the chain state.
*

View File

@@ -71,6 +71,8 @@ public interface BlockImporter {
* @param receipts The receipts associated with this block.
* @param headerValidationMode Determines the validation to perform on this header.
* @param ommerValidationMode Determines the validation to perform on ommer headers.
* @param bodyValidationMode Determines the validation to perform on the block's body.
* @param importWithTxIndexing Whether to import the block with transaction indexing.
* @return {@code BlockImportResult}
* @see BlockImportResult
*/
@@ -80,5 +82,6 @@ public interface BlockImporter {
List<TransactionReceipt> receipts,
HeaderValidationMode headerValidationMode,
HeaderValidationMode ommerValidationMode,
BodyValidationMode bodyValidationMode);
BodyValidationMode bodyValidationMode,
boolean importWithTxIndexing);
}

View File

@@ -63,11 +63,16 @@ public class MainnetBlockImporter implements BlockImporter {
final List<TransactionReceipt> receipts,
final HeaderValidationMode headerValidationMode,
final HeaderValidationMode ommerValidationMode,
final BodyValidationMode bodyValidationMode) {
final BodyValidationMode bodyValidationMode,
final boolean importWithTxIndexing) {
if (blockValidator.validateBlockForSyncing(
context, block, receipts, headerValidationMode, ommerValidationMode, bodyValidationMode)) {
context.getBlockchain().appendBlock(block, receipts);
if (importWithTxIndexing) {
context.getBlockchain().appendBlock(block, receipts);
} else {
context.getBlockchain().appendBlockWithoutIndexingTransactions(block, receipts);
}
return new BlockImportResult(true);
}

View File

@@ -150,7 +150,8 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
attachedValidationPolicy,
ommerValidationPolicy,
ethContext,
fastSyncState.getPivotBlockHeader().get());
fastSyncState.getPivotBlockHeader().get(),
syncConfig.getSnapSyncConfiguration().isSnapSyncTransactionIndexingEnabled());
return PipelineBuilder.createPipelineFrom(
"fetchCheckpoints",

View File

@@ -47,6 +47,7 @@ public class ImportBlocksStep implements Consumer<List<BlockWithReceipts>> {
private OptionalLong logStartBlock = OptionalLong.empty();
private final BlockHeader pivotHeader;
private final BodyValidationMode bodyValidationMode;
private final boolean transactionIndexingEnabled;
public ImportBlocksStep(
final ProtocolSchedule protocolSchedule,
@@ -54,17 +55,19 @@ public class ImportBlocksStep implements Consumer<List<BlockWithReceipts>> {
final ValidationPolicy headerValidationPolicy,
final ValidationPolicy ommerValidationPolicy,
final EthContext ethContext,
final BlockHeader pivotHeader) {
final BlockHeader pivotHeader,
final boolean transactionIndexingEnabled) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.headerValidationPolicy = headerValidationPolicy;
this.ommerValidationPolicy = ommerValidationPolicy;
this.ethContext = ethContext;
this.pivotHeader = pivotHeader;
bodyValidationMode =
this.bodyValidationMode =
protocolSchedule.anyMatch(scheduledProtocolSpec -> scheduledProtocolSpec.spec().isPoS())
? BodyValidationMode.NONE
: BodyValidationMode.LIGHT;
this.transactionIndexingEnabled = transactionIndexingEnabled;
}
@Override
@@ -125,7 +128,8 @@ public class ImportBlocksStep implements Consumer<List<BlockWithReceipts>> {
blockWithReceipts.getReceipts(),
headerValidationPolicy.getValidationModeForNextBlock(),
ommerValidationPolicy.getValidationModeForNextBlock(),
bodyValidationMode);
bodyValidationMode,
transactionIndexingEnabled);
return blockImportResult.isImported();
}
}

View File

@@ -39,6 +39,7 @@ public class SnapSyncConfiguration {
public static final Boolean DEFAULT_SNAP_SERVER_ENABLED = Boolean.FALSE;
public static final Boolean DEFAULT_SNAP_SYNC_BFT_ENABLED = Boolean.FALSE;
public static final Boolean DEFAULT_SNAP_SYNC_TRANSACTION_INDEXING_ENABLED = Boolean.TRUE;
public static SnapSyncConfiguration getDefault() {
return ImmutableSnapSyncConfiguration.builder().build();
@@ -88,4 +89,9 @@ public class SnapSyncConfiguration {
public Boolean isSnapSyncBftEnabled() {
return DEFAULT_SNAP_SYNC_BFT_ENABLED;
}
@Value.Default
public Boolean isSnapSyncTransactionIndexingEnabled() {
return DEFAULT_SNAP_SYNC_TRANSACTION_INDEXING_ENABLED;
}
}

View File

@@ -73,7 +73,8 @@ public class ImportBlocksStepTest {
validationPolicy,
ommerValidationPolicy,
null,
pivotHeader);
pivotHeader,
true);
}
@Test
@@ -91,13 +92,23 @@ public class ImportBlocksStepTest {
blockWithReceipts.getReceipts(),
FULL,
LIGHT,
BodyValidationMode.LIGHT))
BodyValidationMode.LIGHT,
true))
.thenReturn(new BlockImportResult(true));
}
importBlocksStep.accept(blocksWithReceipts);
for (final BlockWithReceipts blockWithReceipts : blocksWithReceipts) {
verify(protocolSchedule).getByBlockHeader(blockWithReceipts.getHeader());
verify(blockImporter)
.importBlockForSyncing(
protocolContext,
blockWithReceipts.getBlock(),
blockWithReceipts.getReceipts(),
FULL,
LIGHT,
BodyValidationMode.LIGHT,
true);
}
verify(validationPolicy, times(blocks.size())).getValidationModeForNextBlock();
}
@@ -113,9 +124,50 @@ public class ImportBlocksStepTest {
blockWithReceipts.getReceipts(),
FULL,
LIGHT,
BodyValidationMode.LIGHT))
BodyValidationMode.LIGHT,
true))
.thenReturn(new BlockImportResult(false));
assertThatThrownBy(() -> importBlocksStep.accept(singletonList(blockWithReceipts)))
.isInstanceOf(InvalidBlockException.class);
}
@Test
public void shouldImportBlockWithoutTxIndexingWhenNotEnabled() {
ImportBlocksStep importBlocksStep =
new ImportBlocksStep(
protocolSchedule,
protocolContext,
validationPolicy,
ommerValidationPolicy,
null,
pivotHeader,
false);
final Block block = gen.block();
final BlockWithReceipts blockWithReceipts = new BlockWithReceipts(block, gen.receipts(block));
when(blockImporter.importBlockForSyncing(
protocolContext,
blockWithReceipts.getBlock(),
blockWithReceipts.getReceipts(),
FULL,
LIGHT,
BodyValidationMode.LIGHT,
false))
.thenReturn(new BlockImportResult(true));
importBlocksStep.accept(List.of(blockWithReceipts));
verify(protocolSchedule).getByBlockHeader(blockWithReceipts.getHeader());
verify(validationPolicy, times(1)).getValidationModeForNextBlock();
verify(blockImporter)
.importBlockForSyncing(
protocolContext,
blockWithReceipts.getBlock(),
blockWithReceipts.getReceipts(),
FULL,
LIGHT,
BodyValidationMode.LIGHT,
false);
}
}