coordinator: carveout conflation app (#1138)

* coordinator: carvout conflation app

* coordinator: stop conflation app

* coordinator: fix client setup

* coordinator: fix spotless
This commit is contained in:
Fluent Crafter
2025-06-12 09:17:02 +01:00
committed by GitHub
parent 434b4ca3e6
commit 8461bde17e
6 changed files with 662 additions and 560 deletions

View File

@@ -24,6 +24,7 @@ endpoints = ["http://127.0.0.1:8998/"]
[type2-state-proof-provider]
disabled = true
endpoints = ["http://127.0.0.1:8889/"]
[l1-finalization-monitor]
l1-query-block-tag="LATEST"

View File

@@ -1,22 +1,16 @@
package net.consensys.zkevm.coordinator.app
import build.linea.clients.StateManagerClientV1
import build.linea.clients.StateManagerV1JsonRpcClient
import io.vertx.core.Vertx
import io.vertx.sqlclient.SqlClient
import kotlinx.datetime.Clock
import linea.anchoring.MessageAnchoringApp
import linea.blob.ShnarfCalculatorVersion
import linea.contract.l1.LineaRollupSmartContractClientReadOnly
import linea.contract.l1.Web3JLineaRollupSmartContractClientReadOnly
import linea.contract.l2.Web3JL2MessageServiceSmartContractClient
import linea.coordinator.config.toJsonRpcRetry
import linea.coordinator.config.v2.CoordinatorConfig
import linea.coordinator.config.v2.isDisabled
import linea.coordinator.config.v2.isEnabled
import linea.domain.BlockNumberAndHash
import linea.domain.RetryConfig
import linea.encoding.BlockRLPEncoder
import linea.kotlin.toKWeiUInt
import linea.web3j.ExtendedWeb3JImpl
import linea.web3j.SmartContractErrors
@@ -24,7 +18,6 @@ import linea.web3j.Web3jBlobExtended
import linea.web3j.createWeb3jHttpClient
import linea.web3j.createWeb3jHttpService
import linea.web3j.ethapi.createEthApiClient
import net.consensys.linea.contract.l1.GenesisStateProvider
import net.consensys.linea.ethereum.gaspricing.BoundableFeeCalculator
import net.consensys.linea.ethereum.gaspricing.FeesCalculator
import net.consensys.linea.ethereum.gaspricing.FeesFetcher
@@ -45,53 +38,18 @@ import net.consensys.linea.ethereum.gaspricing.staticcap.VariableFeesCalculator
import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory
import net.consensys.linea.metrics.LineaMetricsCategory
import net.consensys.linea.metrics.MetricsFacade
import net.consensys.linea.traces.TracesCountersV2
import net.consensys.zkevm.LongRunningService
import net.consensys.zkevm.coordinator.blockcreation.BatchesRepoBasedLastProvenBlockNumberProvider
import net.consensys.zkevm.coordinator.blockcreation.BlockCreationMonitor
import net.consensys.zkevm.coordinator.blockcreation.GethCliqueSafeBlockProvider
import net.consensys.zkevm.coordinator.clients.ExecutionProverClientV2
import net.consensys.zkevm.coordinator.app.conflation.ConflationApp
import net.consensys.zkevm.coordinator.app.conflation.ConflationAppHelper.resumeConflationFrom
import net.consensys.zkevm.coordinator.clients.ShomeiClient
import net.consensys.zkevm.coordinator.clients.TracesGeneratorJsonRpcClientV2
import net.consensys.zkevm.coordinator.clients.prover.ProverClientFactory
import net.consensys.zkevm.coordinator.clients.smartcontract.LineaRollupSmartContractClient
import net.consensys.zkevm.domain.BlobSubmittedEvent
import net.consensys.zkevm.domain.BlocksConflation
import net.consensys.zkevm.domain.FinalizationSubmittedEvent
import net.consensys.zkevm.ethereum.coordination.EventDispatcher
import net.consensys.zkevm.ethereum.coordination.HighestConflationTracker
import net.consensys.zkevm.ethereum.coordination.HighestProvenBatchTracker
import net.consensys.zkevm.ethereum.coordination.HighestProvenBlobTracker
import net.consensys.zkevm.ethereum.coordination.HighestULongTracker
import net.consensys.zkevm.ethereum.coordination.HighestUnprovenBlobTracker
import net.consensys.zkevm.ethereum.coordination.LatestBlobSubmittedBlockNumberTracker
import net.consensys.zkevm.ethereum.coordination.LatestFinalizationSubmittedBlockNumberTracker
import net.consensys.zkevm.ethereum.coordination.SimpleCompositeSafeFutureHandler
import net.consensys.zkevm.ethereum.coordination.aggregation.ConsecutiveProvenBlobsProviderWithLastEndBlockNumberTracker
import net.consensys.zkevm.ethereum.coordination.aggregation.ProofAggregationCoordinatorService
import net.consensys.zkevm.ethereum.coordination.blob.BlobCompressionProofCoordinator
import net.consensys.zkevm.ethereum.coordination.blob.BlobCompressionProofUpdate
import net.consensys.zkevm.ethereum.coordination.blob.BlobZkStateProviderImpl
import net.consensys.zkevm.ethereum.coordination.blob.GoBackedBlobCompressor
import net.consensys.zkevm.ethereum.coordination.blob.GoBackedBlobShnarfCalculator
import net.consensys.zkevm.ethereum.coordination.blob.RollingBlobShnarfCalculator
import net.consensys.zkevm.ethereum.coordination.blockcreation.ForkChoiceUpdaterImpl
import net.consensys.zkevm.ethereum.coordination.conflation.BlockToBatchSubmissionCoordinator
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculator
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByBlockLimit
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByDataCompressed
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByExecutionTraces
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByTargetBlockNumbers
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByTimeDeadline
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationService
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationServiceImpl
import net.consensys.zkevm.ethereum.coordination.conflation.DeadlineConflationCalculatorRunner
import net.consensys.zkevm.ethereum.coordination.conflation.GlobalBlobAwareConflationCalculator
import net.consensys.zkevm.ethereum.coordination.conflation.GlobalBlockConflationCalculator
import net.consensys.zkevm.ethereum.coordination.conflation.ProofGeneratingConflationHandlerImpl
import net.consensys.zkevm.ethereum.coordination.conflation.TracesConflationCalculator
import net.consensys.zkevm.ethereum.coordination.conflation.TracesConflationCoordinatorImpl
import net.consensys.zkevm.ethereum.coordination.proofcreation.ZkProofCreationCoordinatorImpl
import net.consensys.zkevm.ethereum.finalization.AggregationFinalizationCoordinator
import net.consensys.zkevm.ethereum.finalization.AggregationSubmitterImpl
import net.consensys.zkevm.ethereum.finalization.FinalizationHandler
@@ -103,11 +61,9 @@ import net.consensys.zkevm.persistence.AggregationsRepository
import net.consensys.zkevm.persistence.BatchesRepository
import net.consensys.zkevm.persistence.BlobsRepository
import net.consensys.zkevm.persistence.dao.aggregation.RecordsCleanupFinalizationHandler
import net.consensys.zkevm.persistence.dao.batch.persistence.BatchProofHandlerImpl
import net.consensys.zkevm.persistence.dao.feehistory.FeeHistoriesPostgresDao
import net.consensys.zkevm.persistence.dao.feehistory.FeeHistoriesRepositoryImpl
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.web3j.protocol.Web3j
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.util.concurrent.CompletableFuture
@@ -139,10 +95,6 @@ class L1DependentApp(
log.warn("L1 submission disabled for aggregations")
}
}
if (configs.messageAnchoring.isDisabled()) {
log.warn("Message anchoring is disabled")
}
if (configs.l2NetworkGasPricing.isDisabled()) {
log.warn("L2 Network dynamic gas pricing is disabled")
}
@@ -160,12 +112,6 @@ class L1DependentApp(
).getChainId().get()
}
private val proverClientFactory = ProverClientFactory(
vertx = vertx,
config = configs.proversConfig,
metricsFacade = metricsFacade,
)
private val finalizationTransactionManager = createTransactionManager(
vertx = vertx,
signerConfig = configs.l1Submission!!.aggregation.signer,
@@ -344,122 +290,6 @@ class L1DependentApp(
aggregationsRepository,
lastFinalizedBlock,
).get()
private val lastConsecutiveAggregatedBlockNumber = resumeAggregationFrom(
aggregationsRepository,
lastFinalizedBlock,
).get()
val l2Web3jClientForBlockCreation: Web3j = createWeb3jHttpClient(
rpcUrl = configs.conflation.l2Endpoint.toString(),
log = LogManager.getLogger("clients.l2.eth.conflation"),
)
private fun createDeadlineConflationCalculatorRunner(): DeadlineConflationCalculatorRunner? {
if (configs.conflation.isDisabled() || configs.conflation.conflationDeadline == null) {
log.info("Conflation deadline calculator is disabled")
return null
}
return DeadlineConflationCalculatorRunner(
conflationDeadlineCheckInterval = configs.conflation.conflationDeadlineCheckInterval,
delegate = ConflationCalculatorByTimeDeadline(
config = ConflationCalculatorByTimeDeadline.Config(
conflationDeadline = configs.conflation.conflationDeadline,
conflationDeadlineLastBlockConfirmationDelay =
configs.conflation.conflationDeadlineLastBlockConfirmationDelay,
),
lastBlockNumber = lastProcessedBlockNumber,
clock = Clock.System,
latestBlockProvider = GethCliqueSafeBlockProvider(
l2Web3jClientForBlockCreation,
GethCliqueSafeBlockProvider.Config(blocksToFinalization = 0),
),
),
)
}
private val deadlineConflationCalculatorRunner = createDeadlineConflationCalculatorRunner()
private fun addBlocksLimitCalculatorIfDefined(calculators: MutableList<ConflationCalculator>) {
if (configs.conflation.blocksLimit != null) {
calculators.add(
ConflationCalculatorByBlockLimit(
blockLimit = configs.conflation.blocksLimit,
),
)
}
}
private fun addTargetEndBlockConflationCalculatorIfDefined(calculators: MutableList<ConflationCalculator>) {
if (configs.conflation.proofAggregation.targetEndBlocks?.isNotEmpty() ?: false) {
calculators.add(
ConflationCalculatorByTargetBlockNumbers(
targetEndBlockNumbers = configs.conflation.proofAggregation.targetEndBlocks!!.toSet(),
),
)
}
}
private fun createCalculatorsForBlobsAndConflation(
logger: Logger,
compressedBlobCalculator: ConflationCalculatorByDataCompressed,
): List<ConflationCalculator> {
val calculators: MutableList<ConflationCalculator> =
mutableListOf(
ConflationCalculatorByExecutionTraces(
tracesCountersLimit = configs.conflation.tracesLimitsV2,
emptyTracesCounters = TracesCountersV2.EMPTY_TRACES_COUNT,
metricsFacade = metricsFacade,
log = logger,
),
compressedBlobCalculator,
)
addBlocksLimitCalculatorIfDefined(calculators)
addTargetEndBlockConflationCalculatorIfDefined(calculators)
return calculators
}
private val conflationCalculator: TracesConflationCalculator = run {
val logger = LogManager.getLogger(GlobalBlockConflationCalculator::class.java)
// To fail faster for JNA reasons
val blobCompressor = GoBackedBlobCompressor.getInstance(
compressorVersion = configs.conflation.blobCompression.blobCompressorVersion,
dataLimit = configs.conflation.blobCompression.blobSizeLimit,
metricsFacade = metricsFacade,
)
val compressedBlobCalculator = ConflationCalculatorByDataCompressed(
blobCompressor = blobCompressor,
)
val globalCalculator = GlobalBlockConflationCalculator(
lastBlockNumber = lastProcessedBlockNumber,
syncCalculators = createCalculatorsForBlobsAndConflation(logger, compressedBlobCalculator),
deferredTriggerConflationCalculators = listOfNotNull(deadlineConflationCalculatorRunner),
emptyTracesCounters = TracesCountersV2.EMPTY_TRACES_COUNT,
log = logger,
)
val batchesLimit = configs.conflation.blobCompression.batchesLimit
?: (configs.conflation.proofAggregation.proofsLimit - 1U)
GlobalBlobAwareConflationCalculator(
conflationCalculator = globalCalculator,
blobCalculator = compressedBlobCalculator,
metricsFacade = metricsFacade,
batchesLimit = batchesLimit,
)
}
private val conflationService: ConflationService =
ConflationServiceImpl(calculator = conflationCalculator, metricsFacade = metricsFacade)
private val zkStateClient: StateManagerClientV1 = StateManagerV1JsonRpcClient.create(
rpcClientFactory = httpJsonRpcClientFactory,
endpoints = configs.stateManager.endpoints.map { it.toURI() },
maxInflightRequestsPerClient = configs.stateManager.requestLimitPerEndpoint,
requestRetry = configs.stateManager.requestRetries.toJsonRpcRetry(),
zkStateManagerVersion = configs.stateManager.version,
logger = LogManager.getLogger("clients.StateManagerShomeiClient"),
)
private val lineaSmartContractClientForDataSubmission: LineaRollupSmartContractClient = run {
// The below gas provider will act as the primary gas provider if L1
@@ -496,67 +326,6 @@ class L1DependentApp(
)
}
private val genesisStateProvider = GenesisStateProvider(
stateRootHash = configs.protocol.genesis.genesisStateRootHash,
shnarf = configs.protocol.genesis.genesisShnarf,
)
private val blobCompressionProofCoordinator = run {
val maxProvenBlobCache = run {
val highestProvenBlobTracker = HighestProvenBlobTracker(lastProcessedBlockNumber)
metricsFacade.createGauge(
category = LineaMetricsCategory.BLOB,
name = "proven.highest.block.number",
description = "Highest proven blob compression block number",
measurementSupplier = highestProvenBlobTracker,
)
highestProvenBlobTracker
}
val blobCompressionProofHandler: (BlobCompressionProofUpdate) -> SafeFuture<*> = SimpleCompositeSafeFutureHandler(
listOf(
maxProvenBlobCache,
),
)
val blobCompressionProofCoordinator = BlobCompressionProofCoordinator(
vertx = vertx,
blobsRepository = blobsRepository,
blobCompressionProverClient = proverClientFactory.blobCompressionProverClient(),
rollingBlobShnarfCalculator = RollingBlobShnarfCalculator(
blobShnarfCalculator = GoBackedBlobShnarfCalculator(
version = ShnarfCalculatorVersion.V1_2,
metricsFacade = metricsFacade,
),
blobsRepository = blobsRepository,
genesisShnarf = genesisStateProvider.shnarf,
),
blobZkStateProvider = BlobZkStateProviderImpl(
zkStateClient = zkStateClient,
),
config = BlobCompressionProofCoordinator.Config(
pollingInterval = configs.conflation.blobCompression.handlerPollingInterval,
),
blobCompressionProofHandler = blobCompressionProofHandler,
metricsFacade = metricsFacade,
)
val highestUnprovenBlobTracker = HighestUnprovenBlobTracker(lastProcessedBlockNumber)
metricsFacade.createGauge(
category = LineaMetricsCategory.BLOB,
name = "unproven.highest.block.number",
description = "Block number of highest unproven blob produced",
measurementSupplier = highestUnprovenBlobTracker,
)
val compositeSafeFutureHandler = SimpleCompositeSafeFutureHandler(
listOf(
blobCompressionProofCoordinator::handleBlob,
highestUnprovenBlobTracker,
),
)
conflationCalculator.onBlobCreation(compositeSafeFutureHandler)
blobCompressionProofCoordinator
}
private val highestAcceptedBlobTracker = HighestULongTracker(lastProcessedBlockNumber).also {
metricsFacade.createGauge(
category = LineaMetricsCategory.BLOB,
@@ -572,11 +341,11 @@ class L1DependentApp(
acceptedBlobEndBlockNumberConsumer = { highestAcceptedBlobTracker(it) },
)
private val latestBlobSubmittedBlockNumberTracker = LatestBlobSubmittedBlockNumberTracker(0UL)
private val blobSubmissionCoordinator = run {
if (configs.l1Submission.isDisabled() || configs.l1Submission!!.blob.isDisabled()) {
DisabledLongRunningService
} else {
val latestBlobSubmittedBlockNumberTracker = LatestBlobSubmittedBlockNumberTracker(0UL)
metricsFacade.createGauge(
category = LineaMetricsCategory.BLOB,
name = "highest.submitted.on.l1",
@@ -621,69 +390,6 @@ class L1DependentApp(
}
}
private val proofAggregationCoordinatorService: LongRunningService = run {
val maxBlobEndBlockNumberTracker = ConsecutiveProvenBlobsProviderWithLastEndBlockNumberTracker(
aggregationsRepository,
lastProcessedBlockNumber,
)
metricsFacade.createGauge(
category = LineaMetricsCategory.BLOB,
name = "proven.highest.consecutive.block.number",
description = "Highest consecutive proven blob compression block number",
measurementSupplier = maxBlobEndBlockNumberTracker,
)
val highestAggregationTracker = HighestULongTracker(lastConsecutiveAggregatedBlockNumber)
metricsFacade.createGauge(
category = LineaMetricsCategory.AGGREGATION,
name = "proven.highest.block.number",
description = "Highest proven aggregation block number",
measurementSupplier = highestAggregationTracker,
)
val l2Web3jClient = createWeb3jHttpClient(
rpcUrl = configs.conflation.l2Endpoint.toString(),
log = LogManager.getLogger("clients.l2.eth.conflation"),
)
ProofAggregationCoordinatorService
.create(
vertx = vertx,
aggregationCoordinatorPollingInterval = configs.conflation.proofAggregation.coordinatorPollingInterval,
deadlineCheckInterval = configs.conflation.proofAggregation.deadlineCheckInterval,
aggregationDeadline = configs.conflation.proofAggregation.deadline,
latestBlockProvider = GethCliqueSafeBlockProvider(
web3j = l2Web3jClient,
config = GethCliqueSafeBlockProvider.Config(0),
),
maxProofsPerAggregation = configs.conflation.proofAggregation.proofsLimit,
startBlockNumberInclusive = lastConsecutiveAggregatedBlockNumber + 1u,
aggregationsRepository = aggregationsRepository,
consecutiveProvenBlobsProvider = maxBlobEndBlockNumberTracker,
proofAggregationClient = proverClientFactory.proofAggregationProverClient(),
l2EthApiClient = createEthApiClient(
l2Web3jClient,
requestRetryConfig = linea.domain.RetryConfig(
backoffDelay = 1.seconds,
failuresWarningThreshold = 3u,
),
vertx = vertx,
),
l2MessageService = Web3JL2MessageServiceSmartContractClient.createReadOnly(
web3jClient = l2Web3jClient,
contractAddress = configs.protocol.l2.contractAddress,
smartContractErrors = smartContractErrors,
smartContractDeploymentBlockNumber = configs.protocol.l2.contractDeploymentBlockNumber?.getNumber(),
),
aggregationDeadlineDelay = configs.conflation.conflationDeadlineLastBlockConfirmationDelay,
targetEndBlockNumbers = configs.conflation.proofAggregation.targetEndBlocks ?: emptyList(),
metricsFacade = metricsFacade,
provenAggregationEndBlockNumberConsumer = { aggEndBlockNumber -> highestAggregationTracker(aggEndBlockNumber) },
aggregationSizeMultipleOf = configs.conflation.proofAggregation.aggregationSizeMultipleOf,
)
}
private val aggregationFinalizationCoordinator = run {
if (configs.l1Submission.isDisabled() || configs.l1Submission?.aggregation.isDisabled()) {
DisabledLongRunningService
@@ -761,152 +467,6 @@ class L1DependentApp(
}
}
private val block2BatchCoordinator = run {
val tracesCountersClient = run {
val tracesCountersLog = LogManager.getLogger("clients.traces.counters")
TracesGeneratorJsonRpcClientV2(
vertx = vertx,
rpcClient = httpJsonRpcClientFactory.createWithLoadBalancing(
endpoints = configs.traces.counters.endpoints.toSet(),
maxInflightRequestsPerClient = configs.traces.counters.requestLimitPerEndpoint,
log = tracesCountersLog,
),
config = TracesGeneratorJsonRpcClientV2.Config(
expectedTracesApiVersion = configs.traces.expectedTracesApiVersion,
),
retryConfig = configs.traces.counters.requestRetries.toJsonRpcRetry(),
log = tracesCountersLog,
)
}
val tracesConflationClient = run {
val tracesConflationLog = LogManager.getLogger("clients.traces.conflation")
TracesGeneratorJsonRpcClientV2(
vertx = vertx,
rpcClient = httpJsonRpcClientFactory.createWithLoadBalancing(
endpoints = configs.traces.conflation.endpoints.toSet(),
maxInflightRequestsPerClient = configs.traces.conflation.requestLimitPerEndpoint,
log = tracesConflationLog,
),
config = TracesGeneratorJsonRpcClientV2.Config(
expectedTracesApiVersion = configs.traces.expectedTracesApiVersion,
),
retryConfig = configs.traces.conflation.requestRetries.toJsonRpcRetry(),
log = tracesConflationLog,
)
}
val blobsConflationHandler: (BlocksConflation) -> SafeFuture<*> = run {
val maxProvenBatchCache = run {
val highestProvenBatchTracker = HighestProvenBatchTracker(lastProcessedBlockNumber)
metricsFacade.createGauge(
category = LineaMetricsCategory.BATCH,
name = "proven.highest.block.number",
description = "Highest proven batch execution block number",
measurementSupplier = highestProvenBatchTracker,
)
highestProvenBatchTracker
}
val batchProofHandler = SimpleCompositeSafeFutureHandler(
listOf(
maxProvenBatchCache,
BatchProofHandlerImpl(batchesRepository)::acceptNewBatch,
),
)
val executionProverClient: ExecutionProverClientV2 = proverClientFactory.executionProverClient(
// we cannot use configs.traces.expectedTracesApiVersion because it breaks prover expected version pattern
tracesVersion = "2.1.0",
stateManagerVersion = configs.stateManager.version,
)
val proofGeneratingConflationHandlerImpl = ProofGeneratingConflationHandlerImpl(
tracesProductionCoordinator = TracesConflationCoordinatorImpl(tracesConflationClient, zkStateClient),
zkProofProductionCoordinator = ZkProofCreationCoordinatorImpl(
executionProverClient = executionProverClient,
l2EthApiClient = createEthApiClient(
rpcUrl = configs.conflation.l2Endpoint.toString(),
log = LogManager.getLogger("clients.l2.eth.conflation"),
requestRetryConfig = configs.conflation.l2RequestRetries,
vertx = vertx,
),
messageServiceAddress = configs.protocol.l2.contractAddress,
),
batchProofHandler = batchProofHandler,
vertx = vertx,
config = ProofGeneratingConflationHandlerImpl.Config(5.seconds),
)
val highestConflationTracker = HighestConflationTracker(lastProcessedBlockNumber)
metricsFacade.createGauge(
category = LineaMetricsCategory.CONFLATION,
name = "last.block.number",
description = "Last conflated block number",
measurementSupplier = highestConflationTracker,
)
val conflationsCounter = metricsFacade.createCounter(
category = LineaMetricsCategory.CONFLATION,
name = "counter",
description = "Counter of new conflations",
)
SimpleCompositeSafeFutureHandler(
listOf(
proofGeneratingConflationHandlerImpl::handleConflatedBatch,
highestConflationTracker,
{
conflationsCounter.increment()
SafeFuture.COMPLETE
},
),
)
}
conflationService.onConflatedBatch(blobsConflationHandler)
BlockToBatchSubmissionCoordinator(
conflationService = conflationService,
tracesCountersClient = tracesCountersClient,
vertx = vertx,
encoder = BlockRLPEncoder,
)
}
private val lastProvenBlockNumberProvider = run {
val lastProvenConsecutiveBatchBlockNumberProvider = BatchesRepoBasedLastProvenBlockNumberProvider(
lastProcessedBlockNumber.toLong(),
batchesRepository,
)
metricsFacade.createGauge(
category = LineaMetricsCategory.BATCH,
name = "proven.highest.consecutive.block.number",
description = "Highest proven consecutive execution batch block number",
measurementSupplier = { lastProvenConsecutiveBatchBlockNumberProvider.getLastKnownProvenBlockNumber() },
)
lastProvenConsecutiveBatchBlockNumberProvider
}
private val blockCreationMonitor = run {
log.info("Resuming conflation from block={} inclusive", lastProcessedBlockNumber + 1UL)
val blockCreationMonitor = BlockCreationMonitor(
vertx = vertx,
web3j = ExtendedWeb3JImpl(l2Web3jClientForBlockCreation),
startingBlockNumberExclusive = lastProcessedBlockNumber.toLong(),
blockCreationListener = block2BatchCoordinator,
lastProvenBlockNumberProviderAsync = lastProvenBlockNumberProvider,
config = BlockCreationMonitor.Config(
pollingInterval = configs.conflation.blocksPollingInterval,
blocksToFinalization = 0L,
blocksFetchLimit = configs.conflation.l2FetchBlocksLimit.toLong(),
// We need to add 1 to forceStopConflationAtBlockInclusive because conflation calculator requires
// block_number = forceStopConflationAtBlockInclusive + 1 to trigger conflation at
// forceStopConflationAtBlockInclusive
lastL2BlockNumberToProcessInclusive = configs.conflation.forceStopConflationAtBlockInclusive?.inc(),
),
)
blockCreationMonitor
}
private fun lastFinalizedBlock(): SafeFuture<ULong> {
val l1BasedLastFinalizedBlockProvider = L1BasedLastFinalizedBlockProvider(
vertx,
@@ -916,56 +476,21 @@ class L1DependentApp(
return l1BasedLastFinalizedBlockProvider.getLastFinalizedBlock()
}
private val messageAnchoringApp: LongRunningService = if (configs.messageAnchoring.isEnabled()) {
configs.messageAnchoring!!
val l1Web3jClient = createWeb3jHttpClient(
rpcUrl = configs.messageAnchoring.l1Endpoint.toString(),
log = LogManager.getLogger("clients.l1.eth.message-anchoring"),
)
val l2Web3jClient = createWeb3jHttpClient(
rpcUrl = configs.messageAnchoring.l2Endpoint.toString(),
log = LogManager.getLogger("clients.l2.eth.message-anchoring"),
)
val l2TransactionManager = createTransactionManager(
vertx = vertx,
signerConfig = configs.messageAnchoring.signer,
client = l2Web3jClient,
)
MessageAnchoringApp(
vertx = vertx,
config = MessageAnchoringApp.Config(
l1RequestRetryConfig = configs.messageAnchoring.l1RequestRetries,
l1PollingInterval = configs.messageAnchoring.l1EventScrapping.pollingInterval,
l1SuccessBackoffDelay = configs.messageAnchoring.l1EventScrapping.ethLogsSearchSuccessBackoffDelay,
l1ContractAddress = configs.protocol.l1.contractAddress,
l1EventPollingTimeout = configs.messageAnchoring.l1EventScrapping.pollingTimeout,
l1EventSearchBlockChunk = configs.messageAnchoring.l1EventScrapping.ethLogsSearchBlockChunkSize,
l1HighestBlockTag = configs.messageAnchoring.l1HighestBlockTag,
l2HighestBlockTag = configs.messageAnchoring.l2HighestBlockTag,
anchoringTickInterval = configs.messageAnchoring.anchoringTickInterval,
messageQueueCapacity = configs.messageAnchoring.messageQueueCapacity,
maxMessagesToAnchorPerL2Transaction = configs.messageAnchoring.maxMessagesToAnchorPerL2Transaction,
),
l1EthApiClient = createEthApiClient(
web3jClient = l1Web3jClient,
requestRetryConfig = null,
vertx = vertx,
),
l2MessageService = Web3JL2MessageServiceSmartContractClient.create(
web3jClient = l2Web3jClient,
contractAddress = configs.protocol.l2.contractAddress,
gasLimit = configs.messageAnchoring.gas.gasLimit,
maxFeePerGasCap = configs.messageAnchoring.gas.maxFeePerGasCap,
feeHistoryBlockCount = configs.messageAnchoring.gas.feeHistoryBlockCount,
feeHistoryRewardPercentile = configs.messageAnchoring.gas.feeHistoryRewardPercentile.toDouble(),
transactionManager = l2TransactionManager,
smartContractErrors = smartContractErrors,
smartContractDeploymentBlockNumber = configs.protocol.l2.contractDeploymentBlockNumber?.getNumber(),
),
)
} else {
DisabledLongRunningService
}
private val messageAnchoringApp: LongRunningService = MessageAnchoringAppConfigurator.create(
vertx = vertx,
configs = configs,
)
private val conflationApp = ConflationApp(
vertx = vertx,
configs = configs,
batchesRepository = batchesRepository,
blobsRepository = blobsRepository,
aggregationsRepository = aggregationsRepository,
lastFinalizedBlock = lastFinalizedBlock,
metricsFacade = metricsFacade,
httpJsonRpcClientFactory = httpJsonRpcClientFactory,
)
private val l2NetworkGasPricingService: L2NetworkGasPricingService? =
if (configs.l2NetworkGasPricing.isEnabled()) {
@@ -1103,7 +628,7 @@ class L1DependentApp(
init {
mapOf(
"last_proven_block_provider" to FinalizationHandler { update: FinalizationMonitor.FinalizationUpdate ->
lastProvenBlockNumberProvider.updateLatestL1FinalizedBlock(update.blockNumber.toLong())
conflationApp.updateLatestL1FinalizedBlock(update.blockNumber.toLong())
},
"finalized records cleanup" to RecordsCleanupFinalizationHandler(
batchesRepository = batchesRepository,
@@ -1120,24 +645,14 @@ class L1DependentApp(
}
override fun start(): CompletableFuture<Unit> {
return cleanupDbDataAfterBlockNumbers(
lastProcessedBlockNumber = lastProcessedBlockNumber,
lastConsecutiveAggregatedBlockNumber = lastConsecutiveAggregatedBlockNumber,
batchesRepository = batchesRepository,
blobsRepository = blobsRepository,
aggregationsRepository = aggregationsRepository,
)
.thenCompose { l1FinalizationMonitor.start() }
return l1FinalizationMonitor.start()
.thenCompose { l1FinalizationHandlerForShomeiRpc.start() }
.thenCompose { blobSubmissionCoordinator.start() }
.thenCompose { aggregationFinalizationCoordinator.start() }
.thenCompose { proofAggregationCoordinatorService.start() }
.thenCompose { messageAnchoringApp.start() }
.thenCompose { conflationApp.start() }
.thenCompose { l2NetworkGasPricingService?.start() ?: SafeFuture.completedFuture(Unit) }
.thenCompose { l1FeeHistoryCachingService.start() }
.thenCompose { deadlineConflationCalculatorRunner?.start() ?: SafeFuture.completedFuture(Unit) }
.thenCompose { blockCreationMonitor.start() }
.thenCompose { blobCompressionProofCoordinator.start() }
.thenPeek {
log.info("L1App started")
}
@@ -1145,69 +660,19 @@ class L1DependentApp(
override fun stop(): CompletableFuture<Unit> {
return SafeFuture.allOf(
conflationApp.stop(),
l1FinalizationMonitor.stop(),
l1FinalizationHandlerForShomeiRpc.stop(),
blobSubmissionCoordinator.stop(),
aggregationFinalizationCoordinator.stop(),
proofAggregationCoordinatorService.stop(),
messageAnchoringApp.stop(),
l2NetworkGasPricingService?.stop() ?: SafeFuture.completedFuture(Unit),
l1FeeHistoryCachingService.stop(),
blockCreationMonitor.stop(),
deadlineConflationCalculatorRunner?.stop() ?: SafeFuture.completedFuture(Unit),
blobCompressionProofCoordinator.stop(),
)
.thenApply { log.info("L1App Stopped") }
}
companion object {
fun cleanupDbDataAfterBlockNumbers(
lastProcessedBlockNumber: ULong,
lastConsecutiveAggregatedBlockNumber: ULong,
batchesRepository: BatchesRepository,
blobsRepository: BlobsRepository,
aggregationsRepository: AggregationsRepository,
): SafeFuture<*> {
val blockNumberInclusiveToDeleteFrom = lastProcessedBlockNumber + 1u
val cleanupBatches = batchesRepository.deleteBatchesAfterBlockNumber(blockNumberInclusiveToDeleteFrom.toLong())
val cleanupBlobs = blobsRepository.deleteBlobsAfterBlockNumber(blockNumberInclusiveToDeleteFrom)
val cleanupAggregations = aggregationsRepository
.deleteAggregationsAfterBlockNumber((lastConsecutiveAggregatedBlockNumber + 1u).toLong())
return SafeFuture.allOf(cleanupBatches, cleanupBlobs, cleanupAggregations)
}
/**
* Returns the last block number inclusive upto which we have consecutive proven blobs or the last finalized block
* number inclusive
*/
fun resumeConflationFrom(
aggregationsRepository: AggregationsRepository,
lastFinalizedBlock: ULong,
): SafeFuture<ULong> {
return aggregationsRepository
.findConsecutiveProvenBlobs(lastFinalizedBlock.toLong() + 1)
.thenApply { blobAndBatchCounters ->
if (blobAndBatchCounters.isNotEmpty()) {
blobAndBatchCounters.last().blobCounters.endBlockNumber
} else {
lastFinalizedBlock
}
}
}
fun resumeAggregationFrom(
aggregationsRepository: AggregationsRepository,
lastFinalizedBlock: ULong,
): SafeFuture<ULong> {
return aggregationsRepository
.findHighestConsecutiveEndBlockNumber(lastFinalizedBlock.toLong() + 1)
.thenApply { highestEndBlockNumber ->
highestEndBlockNumber?.toULong() ?: lastFinalizedBlock
}
}
fun setupL1FinalizationMonitorForShomeiFrontend(
type2StateProofProviderConfig: linea.coordinator.config.v2.Type2StateProofManagerConfig,
httpJsonRpcClientFactory: VertxHttpJsonRpcClientFactory,

View File

@@ -0,0 +1,71 @@
package net.consensys.zkevm.coordinator.app
import io.vertx.core.Vertx
import linea.anchoring.MessageAnchoringApp
import linea.contract.l2.Web3JL2MessageServiceSmartContractClient
import linea.coordinator.config.v2.CoordinatorConfig
import linea.coordinator.config.v2.isDisabled
import linea.web3j.createWeb3jHttpClient
import linea.web3j.ethapi.createEthApiClient
import net.consensys.zkevm.LongRunningService
import org.apache.logging.log4j.LogManager
object MessageAnchoringAppConfigurator {
fun create(
vertx: Vertx,
configs: CoordinatorConfig,
): LongRunningService {
if (configs.messageAnchoring.isDisabled()) {
LogManager.getLogger(MessageAnchoringApp::class.java).warn("Message anchoring is disabled")
return DisabledLongRunningService
}
configs.messageAnchoring!!
val l1Web3jClient = createWeb3jHttpClient(
rpcUrl = configs.messageAnchoring.l1Endpoint.toString(),
log = LogManager.getLogger("clients.l1.eth.message-anchoring"),
)
val l2Web3jClient = createWeb3jHttpClient(
rpcUrl = configs.messageAnchoring.l2Endpoint.toString(),
log = LogManager.getLogger("clients.l2.eth.message-anchoring"),
)
val l2TransactionManager = createTransactionManager(
vertx = vertx,
signerConfig = configs.messageAnchoring.signer,
client = l2Web3jClient,
)
val messageAnchoringApp = MessageAnchoringApp(
vertx = vertx,
config = MessageAnchoringApp.Config(
l1RequestRetryConfig = configs.messageAnchoring.l1RequestRetries,
l1PollingInterval = configs.messageAnchoring.l1EventScrapping.pollingInterval,
l1SuccessBackoffDelay = configs.messageAnchoring.l1EventScrapping.ethLogsSearchSuccessBackoffDelay,
l1ContractAddress = configs.protocol.l1.contractAddress,
l1EventPollingTimeout = configs.messageAnchoring.l1EventScrapping.pollingTimeout,
l1EventSearchBlockChunk = configs.messageAnchoring.l1EventScrapping.ethLogsSearchBlockChunkSize,
l1HighestBlockTag = configs.messageAnchoring.l1HighestBlockTag,
l2HighestBlockTag = configs.messageAnchoring.l2HighestBlockTag,
anchoringTickInterval = configs.messageAnchoring.anchoringTickInterval,
messageQueueCapacity = configs.messageAnchoring.messageQueueCapacity,
maxMessagesToAnchorPerL2Transaction = configs.messageAnchoring.maxMessagesToAnchorPerL2Transaction,
),
l1EthApiClient = createEthApiClient(
web3jClient = l1Web3jClient,
requestRetryConfig = null,
vertx = vertx,
),
l2MessageService = Web3JL2MessageServiceSmartContractClient.create(
web3jClient = l2Web3jClient,
contractAddress = configs.protocol.l2.contractAddress,
gasLimit = configs.messageAnchoring.gas.gasLimit,
maxFeePerGasCap = configs.messageAnchoring.gas.maxFeePerGasCap,
feeHistoryBlockCount = configs.messageAnchoring.gas.feeHistoryBlockCount,
feeHistoryRewardPercentile = configs.messageAnchoring.gas.feeHistoryRewardPercentile.toDouble(),
transactionManager = l2TransactionManager,
smartContractErrors = configs.smartContractErrors,
smartContractDeploymentBlockNumber = configs.protocol.l2.contractDeploymentBlockNumber?.getNumber(),
),
)
return messageAnchoringApp
}
}

View File

@@ -0,0 +1,509 @@
package net.consensys.zkevm.coordinator.app.conflation
import build.linea.clients.StateManagerClientV1
import build.linea.clients.StateManagerV1JsonRpcClient
import io.vertx.core.Vertx
import kotlinx.datetime.Clock
import linea.blob.ShnarfCalculatorVersion
import linea.contract.l2.Web3JL2MessageServiceSmartContractClient
import linea.coordinator.config.toJsonRpcRetry
import linea.coordinator.config.v2.CoordinatorConfig
import linea.coordinator.config.v2.isDisabled
import linea.domain.RetryConfig
import linea.encoding.BlockRLPEncoder
import linea.web3j.ExtendedWeb3JImpl
import linea.web3j.createWeb3jHttpClient
import linea.web3j.ethapi.createEthApiClient
import net.consensys.linea.contract.l1.GenesisStateProvider
import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory
import net.consensys.linea.metrics.LineaMetricsCategory
import net.consensys.linea.metrics.MetricsFacade
import net.consensys.linea.traces.TracesCountersV2
import net.consensys.zkevm.LongRunningService
import net.consensys.zkevm.coordinator.app.conflation.ConflationAppHelper.cleanupDbDataAfterBlockNumbers
import net.consensys.zkevm.coordinator.app.conflation.ConflationAppHelper.resumeAggregationFrom
import net.consensys.zkevm.coordinator.app.conflation.ConflationAppHelper.resumeConflationFrom
import net.consensys.zkevm.coordinator.blockcreation.BatchesRepoBasedLastProvenBlockNumberProvider
import net.consensys.zkevm.coordinator.blockcreation.BlockCreationMonitor
import net.consensys.zkevm.coordinator.blockcreation.GethCliqueSafeBlockProvider
import net.consensys.zkevm.coordinator.clients.ExecutionProverClientV2
import net.consensys.zkevm.coordinator.clients.TracesGeneratorJsonRpcClientV2
import net.consensys.zkevm.coordinator.clients.prover.ProverClientFactory
import net.consensys.zkevm.domain.BlocksConflation
import net.consensys.zkevm.ethereum.coordination.HighestConflationTracker
import net.consensys.zkevm.ethereum.coordination.HighestProvenBatchTracker
import net.consensys.zkevm.ethereum.coordination.HighestProvenBlobTracker
import net.consensys.zkevm.ethereum.coordination.HighestULongTracker
import net.consensys.zkevm.ethereum.coordination.HighestUnprovenBlobTracker
import net.consensys.zkevm.ethereum.coordination.SimpleCompositeSafeFutureHandler
import net.consensys.zkevm.ethereum.coordination.aggregation.ConsecutiveProvenBlobsProviderWithLastEndBlockNumberTracker
import net.consensys.zkevm.ethereum.coordination.aggregation.ProofAggregationCoordinatorService
import net.consensys.zkevm.ethereum.coordination.blob.BlobCompressionProofCoordinator
import net.consensys.zkevm.ethereum.coordination.blob.BlobCompressionProofUpdate
import net.consensys.zkevm.ethereum.coordination.blob.BlobZkStateProviderImpl
import net.consensys.zkevm.ethereum.coordination.blob.GoBackedBlobCompressor
import net.consensys.zkevm.ethereum.coordination.blob.GoBackedBlobShnarfCalculator
import net.consensys.zkevm.ethereum.coordination.blob.RollingBlobShnarfCalculator
import net.consensys.zkevm.ethereum.coordination.conflation.BlockToBatchSubmissionCoordinator
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculator
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByBlockLimit
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByDataCompressed
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByExecutionTraces
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByTargetBlockNumbers
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByTimeDeadline
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationService
import net.consensys.zkevm.ethereum.coordination.conflation.ConflationServiceImpl
import net.consensys.zkevm.ethereum.coordination.conflation.DeadlineConflationCalculatorRunner
import net.consensys.zkevm.ethereum.coordination.conflation.GlobalBlobAwareConflationCalculator
import net.consensys.zkevm.ethereum.coordination.conflation.GlobalBlockConflationCalculator
import net.consensys.zkevm.ethereum.coordination.conflation.ProofGeneratingConflationHandlerImpl
import net.consensys.zkevm.ethereum.coordination.conflation.TracesConflationCalculator
import net.consensys.zkevm.ethereum.coordination.conflation.TracesConflationCoordinatorImpl
import net.consensys.zkevm.ethereum.coordination.proofcreation.ZkProofCreationCoordinatorImpl
import net.consensys.zkevm.persistence.AggregationsRepository
import net.consensys.zkevm.persistence.BatchesRepository
import net.consensys.zkevm.persistence.BlobsRepository
import net.consensys.zkevm.persistence.dao.batch.persistence.BatchProofHandlerImpl
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.web3j.protocol.Web3j
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.util.concurrent.CompletableFuture
import kotlin.time.Duration.Companion.seconds
class ConflationApp(
val vertx: Vertx,
val batchesRepository: BatchesRepository,
val blobsRepository: BlobsRepository,
val aggregationsRepository: AggregationsRepository,
val lastFinalizedBlock: ULong,
val configs: CoordinatorConfig,
val metricsFacade: MetricsFacade,
val httpJsonRpcClientFactory: VertxHttpJsonRpcClientFactory,
) : LongRunningService {
private val log = LogManager.getLogger("conflation.app")
private val lastProcessedBlockNumber = resumeConflationFrom(
aggregationsRepository,
lastFinalizedBlock,
).get()
private val lastConsecutiveAggregatedBlockNumber = resumeAggregationFrom(
aggregationsRepository,
lastFinalizedBlock,
).get()
val l2Web3jClient: Web3j = createWeb3jHttpClient(
rpcUrl = configs.conflation.l2Endpoint.toString(),
log = LogManager.getLogger("clients.l2.eth.conflation"),
)
private val deadlineConflationCalculatorRunner = createDeadlineConflationCalculatorRunner(l2Web3jClient)
private val conflationCalculator: TracesConflationCalculator = run {
val logger = LogManager.getLogger(GlobalBlockConflationCalculator::class.java)
// To fail faster for JNA reasons
val blobCompressor = GoBackedBlobCompressor.Companion.getInstance(
compressorVersion = configs.conflation.blobCompression.blobCompressorVersion,
dataLimit = configs.conflation.blobCompression.blobSizeLimit,
metricsFacade = metricsFacade,
)
val compressedBlobCalculator = ConflationCalculatorByDataCompressed(
blobCompressor = blobCompressor,
)
val globalCalculator = GlobalBlockConflationCalculator(
lastBlockNumber = lastProcessedBlockNumber,
syncCalculators = createCalculatorsForBlobsAndConflation(logger, compressedBlobCalculator),
deferredTriggerConflationCalculators = listOfNotNull(deadlineConflationCalculatorRunner),
emptyTracesCounters = TracesCountersV2.Companion.EMPTY_TRACES_COUNT,
log = logger,
)
val batchesLimit = configs.conflation.blobCompression.batchesLimit
?: (configs.conflation.proofAggregation.proofsLimit - 1U)
GlobalBlobAwareConflationCalculator(
conflationCalculator = globalCalculator,
blobCalculator = compressedBlobCalculator,
metricsFacade = metricsFacade,
batchesLimit = batchesLimit,
)
}
private val conflationService: ConflationService =
ConflationServiceImpl(calculator = conflationCalculator, metricsFacade = metricsFacade)
private val zkStateClient: StateManagerClientV1 = StateManagerV1JsonRpcClient.Companion.create(
rpcClientFactory = httpJsonRpcClientFactory,
endpoints = configs.stateManager.endpoints.map { it.toURI() },
maxInflightRequestsPerClient = configs.stateManager.requestLimitPerEndpoint,
requestRetry = configs.stateManager.requestRetries.toJsonRpcRetry(),
zkStateManagerVersion = configs.stateManager.version,
logger = LogManager.getLogger("clients.StateManagerShomeiClient"),
)
private val proverClientFactory = ProverClientFactory(
vertx = vertx,
config = configs.proversConfig,
metricsFacade = metricsFacade,
)
private val blobCompressionProofCoordinator = run {
val maxProvenBlobCache = run {
val highestProvenBlobTracker = HighestProvenBlobTracker(lastProcessedBlockNumber)
metricsFacade.createGauge(
category = LineaMetricsCategory.BLOB,
name = "proven.highest.block.number",
description = "Highest proven blob compression block number",
measurementSupplier = highestProvenBlobTracker,
)
highestProvenBlobTracker
}
val blobCompressionProofHandler: (BlobCompressionProofUpdate) -> SafeFuture<*> = SimpleCompositeSafeFutureHandler(
listOf(
maxProvenBlobCache,
),
)
val genesisStateProvider = GenesisStateProvider(
stateRootHash = configs.protocol.genesis.genesisStateRootHash,
shnarf = configs.protocol.genesis.genesisShnarf,
)
val blobCompressionProofCoordinator = BlobCompressionProofCoordinator(
vertx = vertx,
blobsRepository = blobsRepository,
blobCompressionProverClient = proverClientFactory.blobCompressionProverClient(),
rollingBlobShnarfCalculator = RollingBlobShnarfCalculator(
blobShnarfCalculator = GoBackedBlobShnarfCalculator(
version = ShnarfCalculatorVersion.V1_2,
metricsFacade = metricsFacade,
),
blobsRepository = blobsRepository,
genesisShnarf = genesisStateProvider.shnarf,
),
blobZkStateProvider = BlobZkStateProviderImpl(
zkStateClient = zkStateClient,
),
config = BlobCompressionProofCoordinator.Config(
pollingInterval = configs.conflation.blobCompression.handlerPollingInterval,
),
blobCompressionProofHandler = blobCompressionProofHandler,
metricsFacade = metricsFacade,
)
val highestUnprovenBlobTracker = HighestUnprovenBlobTracker(lastProcessedBlockNumber)
metricsFacade.createGauge(
category = LineaMetricsCategory.BLOB,
name = "unproven.highest.block.number",
description = "Block number of highest unproven blob produced",
measurementSupplier = highestUnprovenBlobTracker,
)
val compositeSafeFutureHandler = SimpleCompositeSafeFutureHandler(
listOf(
blobCompressionProofCoordinator::handleBlob,
highestUnprovenBlobTracker,
),
)
conflationCalculator.onBlobCreation(compositeSafeFutureHandler)
blobCompressionProofCoordinator
}
private val proofAggregationCoordinatorService: LongRunningService = run {
val maxBlobEndBlockNumberTracker = ConsecutiveProvenBlobsProviderWithLastEndBlockNumberTracker(
aggregationsRepository,
lastProcessedBlockNumber,
)
metricsFacade.createGauge(
category = LineaMetricsCategory.BLOB,
name = "proven.highest.consecutive.block.number",
description = "Highest consecutive proven blob compression block number",
measurementSupplier = maxBlobEndBlockNumberTracker,
)
val highestAggregationTracker = HighestULongTracker(lastConsecutiveAggregatedBlockNumber)
metricsFacade.createGauge(
category = LineaMetricsCategory.AGGREGATION,
name = "proven.highest.block.number",
description = "Highest proven aggregation block number",
measurementSupplier = highestAggregationTracker,
)
ProofAggregationCoordinatorService.Companion
.create(
vertx = vertx,
aggregationCoordinatorPollingInterval = configs.conflation.proofAggregation.coordinatorPollingInterval,
deadlineCheckInterval = configs.conflation.proofAggregation.deadlineCheckInterval,
aggregationDeadline = configs.conflation.proofAggregation.deadline,
latestBlockProvider = GethCliqueSafeBlockProvider(
web3j = l2Web3jClient,
config = GethCliqueSafeBlockProvider.Config(0),
),
maxProofsPerAggregation = configs.conflation.proofAggregation.proofsLimit,
startBlockNumberInclusive = lastConsecutiveAggregatedBlockNumber + 1u,
aggregationsRepository = aggregationsRepository,
consecutiveProvenBlobsProvider = maxBlobEndBlockNumberTracker,
proofAggregationClient = proverClientFactory.proofAggregationProverClient(),
l2EthApiClient = createEthApiClient(
l2Web3jClient,
requestRetryConfig = RetryConfig(
backoffDelay = 1.seconds,
failuresWarningThreshold = 3u,
),
vertx = vertx,
),
l2MessageService = Web3JL2MessageServiceSmartContractClient.Companion.createReadOnly(
web3jClient = l2Web3jClient,
contractAddress = configs.protocol.l2.contractAddress,
smartContractErrors = configs.smartContractErrors,
smartContractDeploymentBlockNumber = configs.protocol.l2.contractDeploymentBlockNumber?.getNumber(),
),
aggregationDeadlineDelay = configs.conflation.conflationDeadlineLastBlockConfirmationDelay,
targetEndBlockNumbers = configs.conflation.proofAggregation.targetEndBlocks ?: emptyList(),
metricsFacade = metricsFacade,
provenAggregationEndBlockNumberConsumer = { aggEndBlockNumber -> highestAggregationTracker(aggEndBlockNumber) },
aggregationSizeMultipleOf = configs.conflation.proofAggregation.aggregationSizeMultipleOf,
)
}
private val block2BatchCoordinator = run {
val tracesCountersClient = run {
val tracesCountersLog = LogManager.getLogger("clients.traces.counters")
TracesGeneratorJsonRpcClientV2(
vertx = vertx,
rpcClient = httpJsonRpcClientFactory.createWithLoadBalancing(
endpoints = configs.traces.counters.endpoints.toSet(),
maxInflightRequestsPerClient = configs.traces.counters.requestLimitPerEndpoint,
log = tracesCountersLog,
),
config = TracesGeneratorJsonRpcClientV2.Config(
expectedTracesApiVersion = configs.traces.expectedTracesApiVersion,
),
retryConfig = configs.traces.counters.requestRetries.toJsonRpcRetry(),
log = tracesCountersLog,
)
}
val tracesConflationClient = run {
val tracesConflationLog = LogManager.getLogger("clients.traces.conflation")
TracesGeneratorJsonRpcClientV2(
vertx = vertx,
rpcClient = httpJsonRpcClientFactory.createWithLoadBalancing(
endpoints = configs.traces.conflation.endpoints.toSet(),
maxInflightRequestsPerClient = configs.traces.conflation.requestLimitPerEndpoint,
log = tracesConflationLog,
),
config = TracesGeneratorJsonRpcClientV2.Config(
expectedTracesApiVersion = configs.traces.expectedTracesApiVersion,
),
retryConfig = configs.traces.conflation.requestRetries.toJsonRpcRetry(),
log = tracesConflationLog,
)
}
val blobsConflationHandler: (BlocksConflation) -> SafeFuture<*> = run {
val maxProvenBatchCache = run {
val highestProvenBatchTracker = HighestProvenBatchTracker(lastProcessedBlockNumber)
metricsFacade.createGauge(
category = LineaMetricsCategory.BATCH,
name = "proven.highest.block.number",
description = "Highest proven batch execution block number",
measurementSupplier = highestProvenBatchTracker,
)
highestProvenBatchTracker
}
val batchProofHandler = SimpleCompositeSafeFutureHandler(
listOf(
maxProvenBatchCache,
BatchProofHandlerImpl(batchesRepository)::acceptNewBatch,
),
)
val executionProverClient: ExecutionProverClientV2 = proverClientFactory.executionProverClient(
// we cannot use configs.traces.expectedTracesApiVersion because it breaks prover expected version pattern
tracesVersion = "2.1.0",
stateManagerVersion = configs.stateManager.version,
)
val proofGeneratingConflationHandlerImpl = ProofGeneratingConflationHandlerImpl(
tracesProductionCoordinator = TracesConflationCoordinatorImpl(tracesConflationClient, zkStateClient),
zkProofProductionCoordinator = ZkProofCreationCoordinatorImpl(
executionProverClient = executionProverClient,
l2EthApiClient = createEthApiClient(
rpcUrl = configs.conflation.l2Endpoint.toString(),
log = LogManager.getLogger("clients.l2.eth.conflation"),
requestRetryConfig = configs.conflation.l2RequestRetries,
vertx = vertx,
),
messageServiceAddress = configs.protocol.l2.contractAddress,
),
batchProofHandler = batchProofHandler,
vertx = vertx,
config = ProofGeneratingConflationHandlerImpl.Config(5.seconds),
)
val highestConflationTracker = HighestConflationTracker(lastProcessedBlockNumber)
metricsFacade.createGauge(
category = LineaMetricsCategory.CONFLATION,
name = "last.block.number",
description = "Last conflated block number",
measurementSupplier = highestConflationTracker,
)
val conflationsCounter = metricsFacade.createCounter(
category = LineaMetricsCategory.CONFLATION,
name = "counter",
description = "Counter of new conflations",
)
SimpleCompositeSafeFutureHandler(
listOf(
proofGeneratingConflationHandlerImpl::handleConflatedBatch,
highestConflationTracker,
{
conflationsCounter.increment()
SafeFuture.COMPLETE
},
),
)
}
conflationService.onConflatedBatch(blobsConflationHandler)
BlockToBatchSubmissionCoordinator(
conflationService = conflationService,
tracesCountersClient = tracesCountersClient,
vertx = vertx,
encoder = BlockRLPEncoder,
)
}
private val lastProvenBlockNumberProvider = run {
val lastProvenConsecutiveBatchBlockNumberProvider = BatchesRepoBasedLastProvenBlockNumberProvider(
lastProcessedBlockNumber.toLong(),
batchesRepository,
)
metricsFacade.createGauge(
category = LineaMetricsCategory.BATCH,
name = "proven.highest.consecutive.block.number",
description = "Highest proven consecutive execution batch block number",
measurementSupplier = { lastProvenConsecutiveBatchBlockNumberProvider.getLastKnownProvenBlockNumber() },
)
lastProvenConsecutiveBatchBlockNumberProvider
}
private val blockCreationMonitor = run {
log.info("Resuming conflation from block={} inclusive", lastProcessedBlockNumber + 1UL)
val blockCreationMonitor = BlockCreationMonitor(
vertx = vertx,
web3j = ExtendedWeb3JImpl(l2Web3jClient),
startingBlockNumberExclusive = lastProcessedBlockNumber.toLong(),
blockCreationListener = block2BatchCoordinator,
lastProvenBlockNumberProviderAsync = lastProvenBlockNumberProvider,
config = BlockCreationMonitor.Config(
pollingInterval = configs.conflation.blocksPollingInterval,
blocksToFinalization = 0L,
blocksFetchLimit = configs.conflation.l2FetchBlocksLimit.toLong(),
// We need to add 1 to forceStopConflationAtBlockInclusive because conflation calculator requires
// block_number = forceStopConflationAtBlockInclusive + 1 to trigger conflation at
// forceStopConflationAtBlockInclusive
lastL2BlockNumberToProcessInclusive = configs.conflation.forceStopConflationAtBlockInclusive?.inc(),
),
)
blockCreationMonitor
}
override fun start(): CompletableFuture<Unit> {
return cleanupDbDataAfterBlockNumbers(
lastProcessedBlockNumber = lastProcessedBlockNumber,
lastConsecutiveAggregatedBlockNumber = lastConsecutiveAggregatedBlockNumber,
batchesRepository = batchesRepository,
blobsRepository = blobsRepository,
aggregationsRepository = aggregationsRepository,
)
.thenCompose { proofAggregationCoordinatorService.start() }
.thenCompose { deadlineConflationCalculatorRunner?.start() ?: SafeFuture.completedFuture(Unit) }
.thenCompose { blockCreationMonitor.start() }
.thenCompose { blobCompressionProofCoordinator.start() }
.thenPeek {
log.info("Conflation started")
}
}
override fun stop(): CompletableFuture<Unit> {
return SafeFuture.allOf(
proofAggregationCoordinatorService.stop(),
blockCreationMonitor.stop(),
deadlineConflationCalculatorRunner?.stop() ?: SafeFuture.completedFuture(Unit),
blobCompressionProofCoordinator.stop(),
)
.thenApply { log.info("Conflation Stopped") }
}
fun updateLatestL1FinalizedBlock(blockNumber: Long): SafeFuture<Unit> {
return lastProvenBlockNumberProvider.updateLatestL1FinalizedBlock(blockNumber)
}
private fun createDeadlineConflationCalculatorRunner(
l2Web3jClient: Web3j,
): DeadlineConflationCalculatorRunner? {
if (configs.conflation.isDisabled() || configs.conflation.conflationDeadline == null) {
log.info("Conflation deadline calculator is disabled")
return null
}
return DeadlineConflationCalculatorRunner(
conflationDeadlineCheckInterval = configs.conflation.conflationDeadlineCheckInterval,
delegate = ConflationCalculatorByTimeDeadline(
config = ConflationCalculatorByTimeDeadline.Config(
conflationDeadline = configs.conflation.conflationDeadline,
conflationDeadlineLastBlockConfirmationDelay =
configs.conflation.conflationDeadlineLastBlockConfirmationDelay,
),
lastBlockNumber = lastProcessedBlockNumber,
clock = Clock.System,
latestBlockProvider = GethCliqueSafeBlockProvider(
l2Web3jClient,
GethCliqueSafeBlockProvider.Config(blocksToFinalization = 0),
),
),
)
}
private fun addBlocksLimitCalculatorIfDefined(calculators: MutableList<ConflationCalculator>) {
if (configs.conflation.blocksLimit != null) {
calculators.add(
ConflationCalculatorByBlockLimit(
blockLimit = configs.conflation.blocksLimit,
),
)
}
}
private fun addTargetEndBlockConflationCalculatorIfDefined(calculators: MutableList<ConflationCalculator>) {
if (configs.conflation.proofAggregation.targetEndBlocks?.isNotEmpty() ?: false) {
calculators.add(
ConflationCalculatorByTargetBlockNumbers(
targetEndBlockNumbers = configs.conflation.proofAggregation.targetEndBlocks!!.toSet(),
),
)
}
}
private fun createCalculatorsForBlobsAndConflation(
logger: Logger,
compressedBlobCalculator: ConflationCalculatorByDataCompressed,
): List<ConflationCalculator> {
val calculators: MutableList<ConflationCalculator> =
mutableListOf(
ConflationCalculatorByExecutionTraces(
tracesCountersLimit = configs.conflation.tracesLimitsV2,
emptyTracesCounters = TracesCountersV2.Companion.EMPTY_TRACES_COUNT,
metricsFacade = metricsFacade,
log = logger,
),
compressedBlobCalculator,
)
addBlocksLimitCalculatorIfDefined(calculators)
addTargetEndBlockConflationCalculatorIfDefined(calculators)
return calculators
}
}

View File

@@ -0,0 +1,54 @@
package net.consensys.zkevm.coordinator.app.conflation
import net.consensys.zkevm.persistence.AggregationsRepository
import net.consensys.zkevm.persistence.BatchesRepository
import net.consensys.zkevm.persistence.BlobsRepository
import tech.pegasys.teku.infrastructure.async.SafeFuture
object ConflationAppHelper {
/**
* Returns the last block number inclusive upto which we have consecutive proven blobs or the last finalized block
* number inclusive
*/
fun resumeConflationFrom(
aggregationsRepository: AggregationsRepository,
lastFinalizedBlock: ULong,
): SafeFuture<ULong> {
return aggregationsRepository
.findConsecutiveProvenBlobs(lastFinalizedBlock.toLong() + 1)
.thenApply { blobAndBatchCounters ->
if (blobAndBatchCounters.isNotEmpty()) {
blobAndBatchCounters.last().blobCounters.endBlockNumber
} else {
lastFinalizedBlock
}
}
}
fun resumeAggregationFrom(
aggregationsRepository: AggregationsRepository,
lastFinalizedBlock: ULong,
): SafeFuture<ULong> {
return aggregationsRepository
.findHighestConsecutiveEndBlockNumber(lastFinalizedBlock.toLong() + 1)
.thenApply { highestEndBlockNumber ->
highestEndBlockNumber?.toULong() ?: lastFinalizedBlock
}
}
fun cleanupDbDataAfterBlockNumbers(
lastProcessedBlockNumber: ULong,
lastConsecutiveAggregatedBlockNumber: ULong,
batchesRepository: BatchesRepository,
blobsRepository: BlobsRepository,
aggregationsRepository: AggregationsRepository,
): SafeFuture<*> {
val blockNumberInclusiveToDeleteFrom = lastProcessedBlockNumber + 1u
val cleanupBatches = batchesRepository.deleteBatchesAfterBlockNumber(blockNumberInclusiveToDeleteFrom.toLong())
val cleanupBlobs = blobsRepository.deleteBlobsAfterBlockNumber(blockNumberInclusiveToDeleteFrom)
val cleanupAggregations = aggregationsRepository
.deleteAggregationsAfterBlockNumber((lastConsecutiveAggregatedBlockNumber + 1u).toLong())
return SafeFuture.allOf(cleanupBatches, cleanupBlobs, cleanupAggregations)
}
}

View File

@@ -1,5 +1,7 @@
package net.consensys.zkevm.coordinator.app
import net.consensys.zkevm.coordinator.app.conflation.ConflationAppHelper.cleanupDbDataAfterBlockNumbers
import net.consensys.zkevm.coordinator.app.conflation.ConflationAppHelper.resumeConflationFrom
import net.consensys.zkevm.persistence.AggregationsRepository
import net.consensys.zkevm.persistence.BatchesRepository
import net.consensys.zkevm.persistence.BlobsRepository
@@ -11,7 +13,7 @@ import org.mockito.Mockito.verify
import org.mockito.kotlin.whenever
import tech.pegasys.teku.infrastructure.async.SafeFuture
class L1DependentAppTest {
class ConflationAppTest {
@Test
fun `test resume conflation from uses lastFinalizedBlock + 1 for db queries`() {
val aggregationsRepository = mock<AggregationsRepository>()
@@ -21,7 +23,7 @@ class L1DependentAppTest {
.thenReturn(SafeFuture.completedFuture(emptyList()))
val lastProcessedBlock =
L1DependentApp.resumeConflationFrom(
resumeConflationFrom(
aggregationsRepository,
lastFinalizedBlock,
).get()
@@ -44,7 +46,7 @@ class L1DependentAppTest {
whenever(aggregationsRepository.deleteAggregationsAfterBlockNumber(anyLong()))
.thenReturn(SafeFuture.completedFuture(0))
L1DependentApp.cleanupDbDataAfterBlockNumbers(
cleanupDbDataAfterBlockNumbers(
lastProcessedBlockNumber = lastProcessedBlock,
lastConsecutiveAggregatedBlockNumber = lastConsecutiveAggregatedBlockNumber,
batchesRepository = batchesRepository,