From 8461bde17e65193f57461ba28c2f76176c894e17 Mon Sep 17 00:00:00 2001 From: Fluent Crafter <205769460+fluentcrafter@users.noreply.github.com> Date: Thu, 12 Jun 2025 09:17:02 +0100 Subject: [PATCH] coordinator: carveout conflation app (#1138) * coordinator: carvout conflation app * coordinator: stop conflation app * coordinator: fix client setup * coordinator: fix spotless --- ...rdinator-config-v2-override-local-dev.toml | 1 + .../zkevm/coordinator/app/L1DependentApp.kt | 579 +----------------- .../app/MessageAnchoringAppConfigurator.kt | 71 +++ .../app/conflation/ConflationApp.kt | 509 +++++++++++++++ .../app/conflation/ConflationAppHelper.kt | 54 ++ ...pendentAppTest.kt => ConflationAppTest.kt} | 8 +- 6 files changed, 662 insertions(+), 560 deletions(-) create mode 100644 coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/MessageAnchoringAppConfigurator.kt create mode 100644 coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/conflation/ConflationApp.kt create mode 100644 coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/conflation/ConflationAppHelper.kt rename coordinator/app/src/test/kotlin/net/consensys/zkevm/coordinator/app/{L1DependentAppTest.kt => ConflationAppTest.kt} (89%) diff --git a/config/coordinator/coordinator-config-v2-override-local-dev.toml b/config/coordinator/coordinator-config-v2-override-local-dev.toml index 6b6b1bdd..0abb5f42 100644 --- a/config/coordinator/coordinator-config-v2-override-local-dev.toml +++ b/config/coordinator/coordinator-config-v2-override-local-dev.toml @@ -24,6 +24,7 @@ endpoints = ["http://127.0.0.1:8998/"] [type2-state-proof-provider] disabled = true +endpoints = ["http://127.0.0.1:8889/"] [l1-finalization-monitor] l1-query-block-tag="LATEST" diff --git a/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/L1DependentApp.kt b/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/L1DependentApp.kt index 3d0fc94d..1eb9506b 100644 --- a/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/L1DependentApp.kt +++ b/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/L1DependentApp.kt @@ -1,22 +1,16 @@ package net.consensys.zkevm.coordinator.app -import build.linea.clients.StateManagerClientV1 -import build.linea.clients.StateManagerV1JsonRpcClient import io.vertx.core.Vertx import io.vertx.sqlclient.SqlClient import kotlinx.datetime.Clock -import linea.anchoring.MessageAnchoringApp -import linea.blob.ShnarfCalculatorVersion import linea.contract.l1.LineaRollupSmartContractClientReadOnly import linea.contract.l1.Web3JLineaRollupSmartContractClientReadOnly -import linea.contract.l2.Web3JL2MessageServiceSmartContractClient import linea.coordinator.config.toJsonRpcRetry import linea.coordinator.config.v2.CoordinatorConfig import linea.coordinator.config.v2.isDisabled import linea.coordinator.config.v2.isEnabled import linea.domain.BlockNumberAndHash import linea.domain.RetryConfig -import linea.encoding.BlockRLPEncoder import linea.kotlin.toKWeiUInt import linea.web3j.ExtendedWeb3JImpl import linea.web3j.SmartContractErrors @@ -24,7 +18,6 @@ import linea.web3j.Web3jBlobExtended import linea.web3j.createWeb3jHttpClient import linea.web3j.createWeb3jHttpService import linea.web3j.ethapi.createEthApiClient -import net.consensys.linea.contract.l1.GenesisStateProvider import net.consensys.linea.ethereum.gaspricing.BoundableFeeCalculator import net.consensys.linea.ethereum.gaspricing.FeesCalculator import net.consensys.linea.ethereum.gaspricing.FeesFetcher @@ -45,53 +38,18 @@ import net.consensys.linea.ethereum.gaspricing.staticcap.VariableFeesCalculator import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory import net.consensys.linea.metrics.LineaMetricsCategory import net.consensys.linea.metrics.MetricsFacade -import net.consensys.linea.traces.TracesCountersV2 import net.consensys.zkevm.LongRunningService -import net.consensys.zkevm.coordinator.blockcreation.BatchesRepoBasedLastProvenBlockNumberProvider -import net.consensys.zkevm.coordinator.blockcreation.BlockCreationMonitor -import net.consensys.zkevm.coordinator.blockcreation.GethCliqueSafeBlockProvider -import net.consensys.zkevm.coordinator.clients.ExecutionProverClientV2 +import net.consensys.zkevm.coordinator.app.conflation.ConflationApp +import net.consensys.zkevm.coordinator.app.conflation.ConflationAppHelper.resumeConflationFrom import net.consensys.zkevm.coordinator.clients.ShomeiClient -import net.consensys.zkevm.coordinator.clients.TracesGeneratorJsonRpcClientV2 -import net.consensys.zkevm.coordinator.clients.prover.ProverClientFactory import net.consensys.zkevm.coordinator.clients.smartcontract.LineaRollupSmartContractClient import net.consensys.zkevm.domain.BlobSubmittedEvent -import net.consensys.zkevm.domain.BlocksConflation import net.consensys.zkevm.domain.FinalizationSubmittedEvent import net.consensys.zkevm.ethereum.coordination.EventDispatcher -import net.consensys.zkevm.ethereum.coordination.HighestConflationTracker -import net.consensys.zkevm.ethereum.coordination.HighestProvenBatchTracker -import net.consensys.zkevm.ethereum.coordination.HighestProvenBlobTracker import net.consensys.zkevm.ethereum.coordination.HighestULongTracker -import net.consensys.zkevm.ethereum.coordination.HighestUnprovenBlobTracker import net.consensys.zkevm.ethereum.coordination.LatestBlobSubmittedBlockNumberTracker import net.consensys.zkevm.ethereum.coordination.LatestFinalizationSubmittedBlockNumberTracker -import net.consensys.zkevm.ethereum.coordination.SimpleCompositeSafeFutureHandler -import net.consensys.zkevm.ethereum.coordination.aggregation.ConsecutiveProvenBlobsProviderWithLastEndBlockNumberTracker -import net.consensys.zkevm.ethereum.coordination.aggregation.ProofAggregationCoordinatorService -import net.consensys.zkevm.ethereum.coordination.blob.BlobCompressionProofCoordinator -import net.consensys.zkevm.ethereum.coordination.blob.BlobCompressionProofUpdate -import net.consensys.zkevm.ethereum.coordination.blob.BlobZkStateProviderImpl -import net.consensys.zkevm.ethereum.coordination.blob.GoBackedBlobCompressor -import net.consensys.zkevm.ethereum.coordination.blob.GoBackedBlobShnarfCalculator -import net.consensys.zkevm.ethereum.coordination.blob.RollingBlobShnarfCalculator import net.consensys.zkevm.ethereum.coordination.blockcreation.ForkChoiceUpdaterImpl -import net.consensys.zkevm.ethereum.coordination.conflation.BlockToBatchSubmissionCoordinator -import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculator -import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByBlockLimit -import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByDataCompressed -import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByExecutionTraces -import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByTargetBlockNumbers -import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByTimeDeadline -import net.consensys.zkevm.ethereum.coordination.conflation.ConflationService -import net.consensys.zkevm.ethereum.coordination.conflation.ConflationServiceImpl -import net.consensys.zkevm.ethereum.coordination.conflation.DeadlineConflationCalculatorRunner -import net.consensys.zkevm.ethereum.coordination.conflation.GlobalBlobAwareConflationCalculator -import net.consensys.zkevm.ethereum.coordination.conflation.GlobalBlockConflationCalculator -import net.consensys.zkevm.ethereum.coordination.conflation.ProofGeneratingConflationHandlerImpl -import net.consensys.zkevm.ethereum.coordination.conflation.TracesConflationCalculator -import net.consensys.zkevm.ethereum.coordination.conflation.TracesConflationCoordinatorImpl -import net.consensys.zkevm.ethereum.coordination.proofcreation.ZkProofCreationCoordinatorImpl import net.consensys.zkevm.ethereum.finalization.AggregationFinalizationCoordinator import net.consensys.zkevm.ethereum.finalization.AggregationSubmitterImpl import net.consensys.zkevm.ethereum.finalization.FinalizationHandler @@ -103,11 +61,9 @@ import net.consensys.zkevm.persistence.AggregationsRepository import net.consensys.zkevm.persistence.BatchesRepository import net.consensys.zkevm.persistence.BlobsRepository import net.consensys.zkevm.persistence.dao.aggregation.RecordsCleanupFinalizationHandler -import net.consensys.zkevm.persistence.dao.batch.persistence.BatchProofHandlerImpl import net.consensys.zkevm.persistence.dao.feehistory.FeeHistoriesPostgresDao import net.consensys.zkevm.persistence.dao.feehistory.FeeHistoriesRepositoryImpl import org.apache.logging.log4j.LogManager -import org.apache.logging.log4j.Logger import org.web3j.protocol.Web3j import tech.pegasys.teku.infrastructure.async.SafeFuture import java.util.concurrent.CompletableFuture @@ -139,10 +95,6 @@ class L1DependentApp( log.warn("L1 submission disabled for aggregations") } } - - if (configs.messageAnchoring.isDisabled()) { - log.warn("Message anchoring is disabled") - } if (configs.l2NetworkGasPricing.isDisabled()) { log.warn("L2 Network dynamic gas pricing is disabled") } @@ -160,12 +112,6 @@ class L1DependentApp( ).getChainId().get() } - private val proverClientFactory = ProverClientFactory( - vertx = vertx, - config = configs.proversConfig, - metricsFacade = metricsFacade, - ) - private val finalizationTransactionManager = createTransactionManager( vertx = vertx, signerConfig = configs.l1Submission!!.aggregation.signer, @@ -344,122 +290,6 @@ class L1DependentApp( aggregationsRepository, lastFinalizedBlock, ).get() - private val lastConsecutiveAggregatedBlockNumber = resumeAggregationFrom( - aggregationsRepository, - lastFinalizedBlock, - ).get() - - val l2Web3jClientForBlockCreation: Web3j = createWeb3jHttpClient( - rpcUrl = configs.conflation.l2Endpoint.toString(), - log = LogManager.getLogger("clients.l2.eth.conflation"), - ) - - private fun createDeadlineConflationCalculatorRunner(): DeadlineConflationCalculatorRunner? { - if (configs.conflation.isDisabled() || configs.conflation.conflationDeadline == null) { - log.info("Conflation deadline calculator is disabled") - return null - } - - return DeadlineConflationCalculatorRunner( - conflationDeadlineCheckInterval = configs.conflation.conflationDeadlineCheckInterval, - delegate = ConflationCalculatorByTimeDeadline( - config = ConflationCalculatorByTimeDeadline.Config( - conflationDeadline = configs.conflation.conflationDeadline, - conflationDeadlineLastBlockConfirmationDelay = - configs.conflation.conflationDeadlineLastBlockConfirmationDelay, - ), - lastBlockNumber = lastProcessedBlockNumber, - clock = Clock.System, - latestBlockProvider = GethCliqueSafeBlockProvider( - l2Web3jClientForBlockCreation, - GethCliqueSafeBlockProvider.Config(blocksToFinalization = 0), - ), - ), - ) - } - - private val deadlineConflationCalculatorRunner = createDeadlineConflationCalculatorRunner() - - private fun addBlocksLimitCalculatorIfDefined(calculators: MutableList) { - if (configs.conflation.blocksLimit != null) { - calculators.add( - ConflationCalculatorByBlockLimit( - blockLimit = configs.conflation.blocksLimit, - ), - ) - } - } - - private fun addTargetEndBlockConflationCalculatorIfDefined(calculators: MutableList) { - if (configs.conflation.proofAggregation.targetEndBlocks?.isNotEmpty() ?: false) { - calculators.add( - ConflationCalculatorByTargetBlockNumbers( - targetEndBlockNumbers = configs.conflation.proofAggregation.targetEndBlocks!!.toSet(), - ), - ) - } - } - - private fun createCalculatorsForBlobsAndConflation( - logger: Logger, - compressedBlobCalculator: ConflationCalculatorByDataCompressed, - ): List { - val calculators: MutableList = - mutableListOf( - ConflationCalculatorByExecutionTraces( - tracesCountersLimit = configs.conflation.tracesLimitsV2, - emptyTracesCounters = TracesCountersV2.EMPTY_TRACES_COUNT, - metricsFacade = metricsFacade, - log = logger, - ), - compressedBlobCalculator, - ) - addBlocksLimitCalculatorIfDefined(calculators) - addTargetEndBlockConflationCalculatorIfDefined(calculators) - return calculators - } - - private val conflationCalculator: TracesConflationCalculator = run { - val logger = LogManager.getLogger(GlobalBlockConflationCalculator::class.java) - - // To fail faster for JNA reasons - val blobCompressor = GoBackedBlobCompressor.getInstance( - compressorVersion = configs.conflation.blobCompression.blobCompressorVersion, - dataLimit = configs.conflation.blobCompression.blobSizeLimit, - metricsFacade = metricsFacade, - ) - - val compressedBlobCalculator = ConflationCalculatorByDataCompressed( - blobCompressor = blobCompressor, - ) - val globalCalculator = GlobalBlockConflationCalculator( - lastBlockNumber = lastProcessedBlockNumber, - syncCalculators = createCalculatorsForBlobsAndConflation(logger, compressedBlobCalculator), - deferredTriggerConflationCalculators = listOfNotNull(deadlineConflationCalculatorRunner), - emptyTracesCounters = TracesCountersV2.EMPTY_TRACES_COUNT, - log = logger, - ) - - val batchesLimit = configs.conflation.blobCompression.batchesLimit - ?: (configs.conflation.proofAggregation.proofsLimit - 1U) - GlobalBlobAwareConflationCalculator( - conflationCalculator = globalCalculator, - blobCalculator = compressedBlobCalculator, - metricsFacade = metricsFacade, - batchesLimit = batchesLimit, - ) - } - private val conflationService: ConflationService = - ConflationServiceImpl(calculator = conflationCalculator, metricsFacade = metricsFacade) - - private val zkStateClient: StateManagerClientV1 = StateManagerV1JsonRpcClient.create( - rpcClientFactory = httpJsonRpcClientFactory, - endpoints = configs.stateManager.endpoints.map { it.toURI() }, - maxInflightRequestsPerClient = configs.stateManager.requestLimitPerEndpoint, - requestRetry = configs.stateManager.requestRetries.toJsonRpcRetry(), - zkStateManagerVersion = configs.stateManager.version, - logger = LogManager.getLogger("clients.StateManagerShomeiClient"), - ) private val lineaSmartContractClientForDataSubmission: LineaRollupSmartContractClient = run { // The below gas provider will act as the primary gas provider if L1 @@ -496,67 +326,6 @@ class L1DependentApp( ) } - private val genesisStateProvider = GenesisStateProvider( - stateRootHash = configs.protocol.genesis.genesisStateRootHash, - shnarf = configs.protocol.genesis.genesisShnarf, - ) - - private val blobCompressionProofCoordinator = run { - val maxProvenBlobCache = run { - val highestProvenBlobTracker = HighestProvenBlobTracker(lastProcessedBlockNumber) - metricsFacade.createGauge( - category = LineaMetricsCategory.BLOB, - name = "proven.highest.block.number", - description = "Highest proven blob compression block number", - measurementSupplier = highestProvenBlobTracker, - ) - highestProvenBlobTracker - } - val blobCompressionProofHandler: (BlobCompressionProofUpdate) -> SafeFuture<*> = SimpleCompositeSafeFutureHandler( - listOf( - maxProvenBlobCache, - ), - ) - - val blobCompressionProofCoordinator = BlobCompressionProofCoordinator( - vertx = vertx, - blobsRepository = blobsRepository, - blobCompressionProverClient = proverClientFactory.blobCompressionProverClient(), - rollingBlobShnarfCalculator = RollingBlobShnarfCalculator( - blobShnarfCalculator = GoBackedBlobShnarfCalculator( - version = ShnarfCalculatorVersion.V1_2, - metricsFacade = metricsFacade, - ), - blobsRepository = blobsRepository, - genesisShnarf = genesisStateProvider.shnarf, - ), - blobZkStateProvider = BlobZkStateProviderImpl( - zkStateClient = zkStateClient, - ), - config = BlobCompressionProofCoordinator.Config( - pollingInterval = configs.conflation.blobCompression.handlerPollingInterval, - ), - blobCompressionProofHandler = blobCompressionProofHandler, - metricsFacade = metricsFacade, - ) - val highestUnprovenBlobTracker = HighestUnprovenBlobTracker(lastProcessedBlockNumber) - metricsFacade.createGauge( - category = LineaMetricsCategory.BLOB, - name = "unproven.highest.block.number", - description = "Block number of highest unproven blob produced", - measurementSupplier = highestUnprovenBlobTracker, - ) - - val compositeSafeFutureHandler = SimpleCompositeSafeFutureHandler( - listOf( - blobCompressionProofCoordinator::handleBlob, - highestUnprovenBlobTracker, - ), - ) - conflationCalculator.onBlobCreation(compositeSafeFutureHandler) - blobCompressionProofCoordinator - } - private val highestAcceptedBlobTracker = HighestULongTracker(lastProcessedBlockNumber).also { metricsFacade.createGauge( category = LineaMetricsCategory.BLOB, @@ -572,11 +341,11 @@ class L1DependentApp( acceptedBlobEndBlockNumberConsumer = { highestAcceptedBlobTracker(it) }, ) - private val latestBlobSubmittedBlockNumberTracker = LatestBlobSubmittedBlockNumberTracker(0UL) private val blobSubmissionCoordinator = run { if (configs.l1Submission.isDisabled() || configs.l1Submission!!.blob.isDisabled()) { DisabledLongRunningService } else { + val latestBlobSubmittedBlockNumberTracker = LatestBlobSubmittedBlockNumberTracker(0UL) metricsFacade.createGauge( category = LineaMetricsCategory.BLOB, name = "highest.submitted.on.l1", @@ -621,69 +390,6 @@ class L1DependentApp( } } - private val proofAggregationCoordinatorService: LongRunningService = run { - val maxBlobEndBlockNumberTracker = ConsecutiveProvenBlobsProviderWithLastEndBlockNumberTracker( - aggregationsRepository, - lastProcessedBlockNumber, - ) - - metricsFacade.createGauge( - category = LineaMetricsCategory.BLOB, - name = "proven.highest.consecutive.block.number", - description = "Highest consecutive proven blob compression block number", - measurementSupplier = maxBlobEndBlockNumberTracker, - ) - - val highestAggregationTracker = HighestULongTracker(lastConsecutiveAggregatedBlockNumber) - metricsFacade.createGauge( - category = LineaMetricsCategory.AGGREGATION, - name = "proven.highest.block.number", - description = "Highest proven aggregation block number", - measurementSupplier = highestAggregationTracker, - ) - - val l2Web3jClient = createWeb3jHttpClient( - rpcUrl = configs.conflation.l2Endpoint.toString(), - log = LogManager.getLogger("clients.l2.eth.conflation"), - ) - - ProofAggregationCoordinatorService - .create( - vertx = vertx, - aggregationCoordinatorPollingInterval = configs.conflation.proofAggregation.coordinatorPollingInterval, - deadlineCheckInterval = configs.conflation.proofAggregation.deadlineCheckInterval, - aggregationDeadline = configs.conflation.proofAggregation.deadline, - latestBlockProvider = GethCliqueSafeBlockProvider( - web3j = l2Web3jClient, - config = GethCliqueSafeBlockProvider.Config(0), - ), - maxProofsPerAggregation = configs.conflation.proofAggregation.proofsLimit, - startBlockNumberInclusive = lastConsecutiveAggregatedBlockNumber + 1u, - aggregationsRepository = aggregationsRepository, - consecutiveProvenBlobsProvider = maxBlobEndBlockNumberTracker, - proofAggregationClient = proverClientFactory.proofAggregationProverClient(), - l2EthApiClient = createEthApiClient( - l2Web3jClient, - requestRetryConfig = linea.domain.RetryConfig( - backoffDelay = 1.seconds, - failuresWarningThreshold = 3u, - ), - vertx = vertx, - ), - l2MessageService = Web3JL2MessageServiceSmartContractClient.createReadOnly( - web3jClient = l2Web3jClient, - contractAddress = configs.protocol.l2.contractAddress, - smartContractErrors = smartContractErrors, - smartContractDeploymentBlockNumber = configs.protocol.l2.contractDeploymentBlockNumber?.getNumber(), - ), - aggregationDeadlineDelay = configs.conflation.conflationDeadlineLastBlockConfirmationDelay, - targetEndBlockNumbers = configs.conflation.proofAggregation.targetEndBlocks ?: emptyList(), - metricsFacade = metricsFacade, - provenAggregationEndBlockNumberConsumer = { aggEndBlockNumber -> highestAggregationTracker(aggEndBlockNumber) }, - aggregationSizeMultipleOf = configs.conflation.proofAggregation.aggregationSizeMultipleOf, - ) - } - private val aggregationFinalizationCoordinator = run { if (configs.l1Submission.isDisabled() || configs.l1Submission?.aggregation.isDisabled()) { DisabledLongRunningService @@ -761,152 +467,6 @@ class L1DependentApp( } } - private val block2BatchCoordinator = run { - val tracesCountersClient = run { - val tracesCountersLog = LogManager.getLogger("clients.traces.counters") - TracesGeneratorJsonRpcClientV2( - vertx = vertx, - rpcClient = httpJsonRpcClientFactory.createWithLoadBalancing( - endpoints = configs.traces.counters.endpoints.toSet(), - maxInflightRequestsPerClient = configs.traces.counters.requestLimitPerEndpoint, - log = tracesCountersLog, - ), - config = TracesGeneratorJsonRpcClientV2.Config( - expectedTracesApiVersion = configs.traces.expectedTracesApiVersion, - ), - retryConfig = configs.traces.counters.requestRetries.toJsonRpcRetry(), - log = tracesCountersLog, - ) - } - - val tracesConflationClient = run { - val tracesConflationLog = LogManager.getLogger("clients.traces.conflation") - TracesGeneratorJsonRpcClientV2( - vertx = vertx, - rpcClient = httpJsonRpcClientFactory.createWithLoadBalancing( - endpoints = configs.traces.conflation.endpoints.toSet(), - maxInflightRequestsPerClient = configs.traces.conflation.requestLimitPerEndpoint, - log = tracesConflationLog, - ), - config = TracesGeneratorJsonRpcClientV2.Config( - expectedTracesApiVersion = configs.traces.expectedTracesApiVersion, - ), - retryConfig = configs.traces.conflation.requestRetries.toJsonRpcRetry(), - log = tracesConflationLog, - ) - } - - val blobsConflationHandler: (BlocksConflation) -> SafeFuture<*> = run { - val maxProvenBatchCache = run { - val highestProvenBatchTracker = HighestProvenBatchTracker(lastProcessedBlockNumber) - metricsFacade.createGauge( - category = LineaMetricsCategory.BATCH, - name = "proven.highest.block.number", - description = "Highest proven batch execution block number", - measurementSupplier = highestProvenBatchTracker, - ) - highestProvenBatchTracker - } - - val batchProofHandler = SimpleCompositeSafeFutureHandler( - listOf( - maxProvenBatchCache, - BatchProofHandlerImpl(batchesRepository)::acceptNewBatch, - ), - ) - val executionProverClient: ExecutionProverClientV2 = proverClientFactory.executionProverClient( - // we cannot use configs.traces.expectedTracesApiVersion because it breaks prover expected version pattern - tracesVersion = "2.1.0", - stateManagerVersion = configs.stateManager.version, - ) - - val proofGeneratingConflationHandlerImpl = ProofGeneratingConflationHandlerImpl( - tracesProductionCoordinator = TracesConflationCoordinatorImpl(tracesConflationClient, zkStateClient), - zkProofProductionCoordinator = ZkProofCreationCoordinatorImpl( - executionProverClient = executionProverClient, - l2EthApiClient = createEthApiClient( - rpcUrl = configs.conflation.l2Endpoint.toString(), - log = LogManager.getLogger("clients.l2.eth.conflation"), - requestRetryConfig = configs.conflation.l2RequestRetries, - vertx = vertx, - ), - messageServiceAddress = configs.protocol.l2.contractAddress, - ), - batchProofHandler = batchProofHandler, - vertx = vertx, - config = ProofGeneratingConflationHandlerImpl.Config(5.seconds), - ) - - val highestConflationTracker = HighestConflationTracker(lastProcessedBlockNumber) - metricsFacade.createGauge( - category = LineaMetricsCategory.CONFLATION, - name = "last.block.number", - description = "Last conflated block number", - measurementSupplier = highestConflationTracker, - ) - val conflationsCounter = metricsFacade.createCounter( - category = LineaMetricsCategory.CONFLATION, - name = "counter", - description = "Counter of new conflations", - ) - - SimpleCompositeSafeFutureHandler( - listOf( - proofGeneratingConflationHandlerImpl::handleConflatedBatch, - highestConflationTracker, - { - conflationsCounter.increment() - SafeFuture.COMPLETE - }, - ), - ) - } - - conflationService.onConflatedBatch(blobsConflationHandler) - - BlockToBatchSubmissionCoordinator( - conflationService = conflationService, - tracesCountersClient = tracesCountersClient, - vertx = vertx, - encoder = BlockRLPEncoder, - ) - } - - private val lastProvenBlockNumberProvider = run { - val lastProvenConsecutiveBatchBlockNumberProvider = BatchesRepoBasedLastProvenBlockNumberProvider( - lastProcessedBlockNumber.toLong(), - batchesRepository, - ) - metricsFacade.createGauge( - category = LineaMetricsCategory.BATCH, - name = "proven.highest.consecutive.block.number", - description = "Highest proven consecutive execution batch block number", - measurementSupplier = { lastProvenConsecutiveBatchBlockNumberProvider.getLastKnownProvenBlockNumber() }, - ) - lastProvenConsecutiveBatchBlockNumberProvider - } - - private val blockCreationMonitor = run { - log.info("Resuming conflation from block={} inclusive", lastProcessedBlockNumber + 1UL) - val blockCreationMonitor = BlockCreationMonitor( - vertx = vertx, - web3j = ExtendedWeb3JImpl(l2Web3jClientForBlockCreation), - startingBlockNumberExclusive = lastProcessedBlockNumber.toLong(), - blockCreationListener = block2BatchCoordinator, - lastProvenBlockNumberProviderAsync = lastProvenBlockNumberProvider, - config = BlockCreationMonitor.Config( - pollingInterval = configs.conflation.blocksPollingInterval, - blocksToFinalization = 0L, - blocksFetchLimit = configs.conflation.l2FetchBlocksLimit.toLong(), - // We need to add 1 to forceStopConflationAtBlockInclusive because conflation calculator requires - // block_number = forceStopConflationAtBlockInclusive + 1 to trigger conflation at - // forceStopConflationAtBlockInclusive - lastL2BlockNumberToProcessInclusive = configs.conflation.forceStopConflationAtBlockInclusive?.inc(), - ), - ) - blockCreationMonitor - } - private fun lastFinalizedBlock(): SafeFuture { val l1BasedLastFinalizedBlockProvider = L1BasedLastFinalizedBlockProvider( vertx, @@ -916,56 +476,21 @@ class L1DependentApp( return l1BasedLastFinalizedBlockProvider.getLastFinalizedBlock() } - private val messageAnchoringApp: LongRunningService = if (configs.messageAnchoring.isEnabled()) { - configs.messageAnchoring!! - val l1Web3jClient = createWeb3jHttpClient( - rpcUrl = configs.messageAnchoring.l1Endpoint.toString(), - log = LogManager.getLogger("clients.l1.eth.message-anchoring"), - ) - val l2Web3jClient = createWeb3jHttpClient( - rpcUrl = configs.messageAnchoring.l2Endpoint.toString(), - log = LogManager.getLogger("clients.l2.eth.message-anchoring"), - ) - val l2TransactionManager = createTransactionManager( - vertx = vertx, - signerConfig = configs.messageAnchoring.signer, - client = l2Web3jClient, - ) - MessageAnchoringApp( - vertx = vertx, - config = MessageAnchoringApp.Config( - l1RequestRetryConfig = configs.messageAnchoring.l1RequestRetries, - l1PollingInterval = configs.messageAnchoring.l1EventScrapping.pollingInterval, - l1SuccessBackoffDelay = configs.messageAnchoring.l1EventScrapping.ethLogsSearchSuccessBackoffDelay, - l1ContractAddress = configs.protocol.l1.contractAddress, - l1EventPollingTimeout = configs.messageAnchoring.l1EventScrapping.pollingTimeout, - l1EventSearchBlockChunk = configs.messageAnchoring.l1EventScrapping.ethLogsSearchBlockChunkSize, - l1HighestBlockTag = configs.messageAnchoring.l1HighestBlockTag, - l2HighestBlockTag = configs.messageAnchoring.l2HighestBlockTag, - anchoringTickInterval = configs.messageAnchoring.anchoringTickInterval, - messageQueueCapacity = configs.messageAnchoring.messageQueueCapacity, - maxMessagesToAnchorPerL2Transaction = configs.messageAnchoring.maxMessagesToAnchorPerL2Transaction, - ), - l1EthApiClient = createEthApiClient( - web3jClient = l1Web3jClient, - requestRetryConfig = null, - vertx = vertx, - ), - l2MessageService = Web3JL2MessageServiceSmartContractClient.create( - web3jClient = l2Web3jClient, - contractAddress = configs.protocol.l2.contractAddress, - gasLimit = configs.messageAnchoring.gas.gasLimit, - maxFeePerGasCap = configs.messageAnchoring.gas.maxFeePerGasCap, - feeHistoryBlockCount = configs.messageAnchoring.gas.feeHistoryBlockCount, - feeHistoryRewardPercentile = configs.messageAnchoring.gas.feeHistoryRewardPercentile.toDouble(), - transactionManager = l2TransactionManager, - smartContractErrors = smartContractErrors, - smartContractDeploymentBlockNumber = configs.protocol.l2.contractDeploymentBlockNumber?.getNumber(), - ), - ) - } else { - DisabledLongRunningService - } + private val messageAnchoringApp: LongRunningService = MessageAnchoringAppConfigurator.create( + vertx = vertx, + configs = configs, + ) + + private val conflationApp = ConflationApp( + vertx = vertx, + configs = configs, + batchesRepository = batchesRepository, + blobsRepository = blobsRepository, + aggregationsRepository = aggregationsRepository, + lastFinalizedBlock = lastFinalizedBlock, + metricsFacade = metricsFacade, + httpJsonRpcClientFactory = httpJsonRpcClientFactory, + ) private val l2NetworkGasPricingService: L2NetworkGasPricingService? = if (configs.l2NetworkGasPricing.isEnabled()) { @@ -1103,7 +628,7 @@ class L1DependentApp( init { mapOf( "last_proven_block_provider" to FinalizationHandler { update: FinalizationMonitor.FinalizationUpdate -> - lastProvenBlockNumberProvider.updateLatestL1FinalizedBlock(update.blockNumber.toLong()) + conflationApp.updateLatestL1FinalizedBlock(update.blockNumber.toLong()) }, "finalized records cleanup" to RecordsCleanupFinalizationHandler( batchesRepository = batchesRepository, @@ -1120,24 +645,14 @@ class L1DependentApp( } override fun start(): CompletableFuture { - return cleanupDbDataAfterBlockNumbers( - lastProcessedBlockNumber = lastProcessedBlockNumber, - lastConsecutiveAggregatedBlockNumber = lastConsecutiveAggregatedBlockNumber, - batchesRepository = batchesRepository, - blobsRepository = blobsRepository, - aggregationsRepository = aggregationsRepository, - ) - .thenCompose { l1FinalizationMonitor.start() } + return l1FinalizationMonitor.start() .thenCompose { l1FinalizationHandlerForShomeiRpc.start() } .thenCompose { blobSubmissionCoordinator.start() } .thenCompose { aggregationFinalizationCoordinator.start() } - .thenCompose { proofAggregationCoordinatorService.start() } .thenCompose { messageAnchoringApp.start() } + .thenCompose { conflationApp.start() } .thenCompose { l2NetworkGasPricingService?.start() ?: SafeFuture.completedFuture(Unit) } .thenCompose { l1FeeHistoryCachingService.start() } - .thenCompose { deadlineConflationCalculatorRunner?.start() ?: SafeFuture.completedFuture(Unit) } - .thenCompose { blockCreationMonitor.start() } - .thenCompose { blobCompressionProofCoordinator.start() } .thenPeek { log.info("L1App started") } @@ -1145,69 +660,19 @@ class L1DependentApp( override fun stop(): CompletableFuture { return SafeFuture.allOf( + conflationApp.stop(), l1FinalizationMonitor.stop(), l1FinalizationHandlerForShomeiRpc.stop(), blobSubmissionCoordinator.stop(), aggregationFinalizationCoordinator.stop(), - proofAggregationCoordinatorService.stop(), messageAnchoringApp.stop(), l2NetworkGasPricingService?.stop() ?: SafeFuture.completedFuture(Unit), l1FeeHistoryCachingService.stop(), - blockCreationMonitor.stop(), - deadlineConflationCalculatorRunner?.stop() ?: SafeFuture.completedFuture(Unit), - blobCompressionProofCoordinator.stop(), ) .thenApply { log.info("L1App Stopped") } } companion object { - - fun cleanupDbDataAfterBlockNumbers( - lastProcessedBlockNumber: ULong, - lastConsecutiveAggregatedBlockNumber: ULong, - batchesRepository: BatchesRepository, - blobsRepository: BlobsRepository, - aggregationsRepository: AggregationsRepository, - ): SafeFuture<*> { - val blockNumberInclusiveToDeleteFrom = lastProcessedBlockNumber + 1u - val cleanupBatches = batchesRepository.deleteBatchesAfterBlockNumber(blockNumberInclusiveToDeleteFrom.toLong()) - val cleanupBlobs = blobsRepository.deleteBlobsAfterBlockNumber(blockNumberInclusiveToDeleteFrom) - val cleanupAggregations = aggregationsRepository - .deleteAggregationsAfterBlockNumber((lastConsecutiveAggregatedBlockNumber + 1u).toLong()) - - return SafeFuture.allOf(cleanupBatches, cleanupBlobs, cleanupAggregations) - } - - /** - * Returns the last block number inclusive upto which we have consecutive proven blobs or the last finalized block - * number inclusive - */ - fun resumeConflationFrom( - aggregationsRepository: AggregationsRepository, - lastFinalizedBlock: ULong, - ): SafeFuture { - return aggregationsRepository - .findConsecutiveProvenBlobs(lastFinalizedBlock.toLong() + 1) - .thenApply { blobAndBatchCounters -> - if (blobAndBatchCounters.isNotEmpty()) { - blobAndBatchCounters.last().blobCounters.endBlockNumber - } else { - lastFinalizedBlock - } - } - } - - fun resumeAggregationFrom( - aggregationsRepository: AggregationsRepository, - lastFinalizedBlock: ULong, - ): SafeFuture { - return aggregationsRepository - .findHighestConsecutiveEndBlockNumber(lastFinalizedBlock.toLong() + 1) - .thenApply { highestEndBlockNumber -> - highestEndBlockNumber?.toULong() ?: lastFinalizedBlock - } - } - fun setupL1FinalizationMonitorForShomeiFrontend( type2StateProofProviderConfig: linea.coordinator.config.v2.Type2StateProofManagerConfig, httpJsonRpcClientFactory: VertxHttpJsonRpcClientFactory, diff --git a/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/MessageAnchoringAppConfigurator.kt b/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/MessageAnchoringAppConfigurator.kt new file mode 100644 index 00000000..f8f6cfdc --- /dev/null +++ b/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/MessageAnchoringAppConfigurator.kt @@ -0,0 +1,71 @@ +package net.consensys.zkevm.coordinator.app + +import io.vertx.core.Vertx +import linea.anchoring.MessageAnchoringApp +import linea.contract.l2.Web3JL2MessageServiceSmartContractClient +import linea.coordinator.config.v2.CoordinatorConfig +import linea.coordinator.config.v2.isDisabled +import linea.web3j.createWeb3jHttpClient +import linea.web3j.ethapi.createEthApiClient +import net.consensys.zkevm.LongRunningService +import org.apache.logging.log4j.LogManager + +object MessageAnchoringAppConfigurator { + fun create( + vertx: Vertx, + configs: CoordinatorConfig, + ): LongRunningService { + if (configs.messageAnchoring.isDisabled()) { + LogManager.getLogger(MessageAnchoringApp::class.java).warn("Message anchoring is disabled") + return DisabledLongRunningService + } + configs.messageAnchoring!! + + val l1Web3jClient = createWeb3jHttpClient( + rpcUrl = configs.messageAnchoring.l1Endpoint.toString(), + log = LogManager.getLogger("clients.l1.eth.message-anchoring"), + ) + val l2Web3jClient = createWeb3jHttpClient( + rpcUrl = configs.messageAnchoring.l2Endpoint.toString(), + log = LogManager.getLogger("clients.l2.eth.message-anchoring"), + ) + val l2TransactionManager = createTransactionManager( + vertx = vertx, + signerConfig = configs.messageAnchoring.signer, + client = l2Web3jClient, + ) + val messageAnchoringApp = MessageAnchoringApp( + vertx = vertx, + config = MessageAnchoringApp.Config( + l1RequestRetryConfig = configs.messageAnchoring.l1RequestRetries, + l1PollingInterval = configs.messageAnchoring.l1EventScrapping.pollingInterval, + l1SuccessBackoffDelay = configs.messageAnchoring.l1EventScrapping.ethLogsSearchSuccessBackoffDelay, + l1ContractAddress = configs.protocol.l1.contractAddress, + l1EventPollingTimeout = configs.messageAnchoring.l1EventScrapping.pollingTimeout, + l1EventSearchBlockChunk = configs.messageAnchoring.l1EventScrapping.ethLogsSearchBlockChunkSize, + l1HighestBlockTag = configs.messageAnchoring.l1HighestBlockTag, + l2HighestBlockTag = configs.messageAnchoring.l2HighestBlockTag, + anchoringTickInterval = configs.messageAnchoring.anchoringTickInterval, + messageQueueCapacity = configs.messageAnchoring.messageQueueCapacity, + maxMessagesToAnchorPerL2Transaction = configs.messageAnchoring.maxMessagesToAnchorPerL2Transaction, + ), + l1EthApiClient = createEthApiClient( + web3jClient = l1Web3jClient, + requestRetryConfig = null, + vertx = vertx, + ), + l2MessageService = Web3JL2MessageServiceSmartContractClient.create( + web3jClient = l2Web3jClient, + contractAddress = configs.protocol.l2.contractAddress, + gasLimit = configs.messageAnchoring.gas.gasLimit, + maxFeePerGasCap = configs.messageAnchoring.gas.maxFeePerGasCap, + feeHistoryBlockCount = configs.messageAnchoring.gas.feeHistoryBlockCount, + feeHistoryRewardPercentile = configs.messageAnchoring.gas.feeHistoryRewardPercentile.toDouble(), + transactionManager = l2TransactionManager, + smartContractErrors = configs.smartContractErrors, + smartContractDeploymentBlockNumber = configs.protocol.l2.contractDeploymentBlockNumber?.getNumber(), + ), + ) + return messageAnchoringApp + } +} diff --git a/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/conflation/ConflationApp.kt b/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/conflation/ConflationApp.kt new file mode 100644 index 00000000..478786e5 --- /dev/null +++ b/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/conflation/ConflationApp.kt @@ -0,0 +1,509 @@ +package net.consensys.zkevm.coordinator.app.conflation + +import build.linea.clients.StateManagerClientV1 +import build.linea.clients.StateManagerV1JsonRpcClient +import io.vertx.core.Vertx +import kotlinx.datetime.Clock +import linea.blob.ShnarfCalculatorVersion +import linea.contract.l2.Web3JL2MessageServiceSmartContractClient +import linea.coordinator.config.toJsonRpcRetry +import linea.coordinator.config.v2.CoordinatorConfig +import linea.coordinator.config.v2.isDisabled +import linea.domain.RetryConfig +import linea.encoding.BlockRLPEncoder +import linea.web3j.ExtendedWeb3JImpl +import linea.web3j.createWeb3jHttpClient +import linea.web3j.ethapi.createEthApiClient +import net.consensys.linea.contract.l1.GenesisStateProvider +import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory +import net.consensys.linea.metrics.LineaMetricsCategory +import net.consensys.linea.metrics.MetricsFacade +import net.consensys.linea.traces.TracesCountersV2 +import net.consensys.zkevm.LongRunningService +import net.consensys.zkevm.coordinator.app.conflation.ConflationAppHelper.cleanupDbDataAfterBlockNumbers +import net.consensys.zkevm.coordinator.app.conflation.ConflationAppHelper.resumeAggregationFrom +import net.consensys.zkevm.coordinator.app.conflation.ConflationAppHelper.resumeConflationFrom +import net.consensys.zkevm.coordinator.blockcreation.BatchesRepoBasedLastProvenBlockNumberProvider +import net.consensys.zkevm.coordinator.blockcreation.BlockCreationMonitor +import net.consensys.zkevm.coordinator.blockcreation.GethCliqueSafeBlockProvider +import net.consensys.zkevm.coordinator.clients.ExecutionProverClientV2 +import net.consensys.zkevm.coordinator.clients.TracesGeneratorJsonRpcClientV2 +import net.consensys.zkevm.coordinator.clients.prover.ProverClientFactory +import net.consensys.zkevm.domain.BlocksConflation +import net.consensys.zkevm.ethereum.coordination.HighestConflationTracker +import net.consensys.zkevm.ethereum.coordination.HighestProvenBatchTracker +import net.consensys.zkevm.ethereum.coordination.HighestProvenBlobTracker +import net.consensys.zkevm.ethereum.coordination.HighestULongTracker +import net.consensys.zkevm.ethereum.coordination.HighestUnprovenBlobTracker +import net.consensys.zkevm.ethereum.coordination.SimpleCompositeSafeFutureHandler +import net.consensys.zkevm.ethereum.coordination.aggregation.ConsecutiveProvenBlobsProviderWithLastEndBlockNumberTracker +import net.consensys.zkevm.ethereum.coordination.aggregation.ProofAggregationCoordinatorService +import net.consensys.zkevm.ethereum.coordination.blob.BlobCompressionProofCoordinator +import net.consensys.zkevm.ethereum.coordination.blob.BlobCompressionProofUpdate +import net.consensys.zkevm.ethereum.coordination.blob.BlobZkStateProviderImpl +import net.consensys.zkevm.ethereum.coordination.blob.GoBackedBlobCompressor +import net.consensys.zkevm.ethereum.coordination.blob.GoBackedBlobShnarfCalculator +import net.consensys.zkevm.ethereum.coordination.blob.RollingBlobShnarfCalculator +import net.consensys.zkevm.ethereum.coordination.conflation.BlockToBatchSubmissionCoordinator +import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculator +import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByBlockLimit +import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByDataCompressed +import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByExecutionTraces +import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByTargetBlockNumbers +import net.consensys.zkevm.ethereum.coordination.conflation.ConflationCalculatorByTimeDeadline +import net.consensys.zkevm.ethereum.coordination.conflation.ConflationService +import net.consensys.zkevm.ethereum.coordination.conflation.ConflationServiceImpl +import net.consensys.zkevm.ethereum.coordination.conflation.DeadlineConflationCalculatorRunner +import net.consensys.zkevm.ethereum.coordination.conflation.GlobalBlobAwareConflationCalculator +import net.consensys.zkevm.ethereum.coordination.conflation.GlobalBlockConflationCalculator +import net.consensys.zkevm.ethereum.coordination.conflation.ProofGeneratingConflationHandlerImpl +import net.consensys.zkevm.ethereum.coordination.conflation.TracesConflationCalculator +import net.consensys.zkevm.ethereum.coordination.conflation.TracesConflationCoordinatorImpl +import net.consensys.zkevm.ethereum.coordination.proofcreation.ZkProofCreationCoordinatorImpl +import net.consensys.zkevm.persistence.AggregationsRepository +import net.consensys.zkevm.persistence.BatchesRepository +import net.consensys.zkevm.persistence.BlobsRepository +import net.consensys.zkevm.persistence.dao.batch.persistence.BatchProofHandlerImpl +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger +import org.web3j.protocol.Web3j +import tech.pegasys.teku.infrastructure.async.SafeFuture +import java.util.concurrent.CompletableFuture +import kotlin.time.Duration.Companion.seconds + +class ConflationApp( + val vertx: Vertx, + val batchesRepository: BatchesRepository, + val blobsRepository: BlobsRepository, + val aggregationsRepository: AggregationsRepository, + val lastFinalizedBlock: ULong, + val configs: CoordinatorConfig, + val metricsFacade: MetricsFacade, + val httpJsonRpcClientFactory: VertxHttpJsonRpcClientFactory, +) : LongRunningService { + private val log = LogManager.getLogger("conflation.app") + + private val lastProcessedBlockNumber = resumeConflationFrom( + aggregationsRepository, + lastFinalizedBlock, + ).get() + private val lastConsecutiveAggregatedBlockNumber = resumeAggregationFrom( + aggregationsRepository, + lastFinalizedBlock, + ).get() + + val l2Web3jClient: Web3j = createWeb3jHttpClient( + rpcUrl = configs.conflation.l2Endpoint.toString(), + log = LogManager.getLogger("clients.l2.eth.conflation"), + ) + + private val deadlineConflationCalculatorRunner = createDeadlineConflationCalculatorRunner(l2Web3jClient) + + private val conflationCalculator: TracesConflationCalculator = run { + val logger = LogManager.getLogger(GlobalBlockConflationCalculator::class.java) + + // To fail faster for JNA reasons + val blobCompressor = GoBackedBlobCompressor.Companion.getInstance( + compressorVersion = configs.conflation.blobCompression.blobCompressorVersion, + dataLimit = configs.conflation.blobCompression.blobSizeLimit, + metricsFacade = metricsFacade, + ) + + val compressedBlobCalculator = ConflationCalculatorByDataCompressed( + blobCompressor = blobCompressor, + ) + val globalCalculator = GlobalBlockConflationCalculator( + lastBlockNumber = lastProcessedBlockNumber, + syncCalculators = createCalculatorsForBlobsAndConflation(logger, compressedBlobCalculator), + deferredTriggerConflationCalculators = listOfNotNull(deadlineConflationCalculatorRunner), + emptyTracesCounters = TracesCountersV2.Companion.EMPTY_TRACES_COUNT, + log = logger, + ) + + val batchesLimit = configs.conflation.blobCompression.batchesLimit + ?: (configs.conflation.proofAggregation.proofsLimit - 1U) + GlobalBlobAwareConflationCalculator( + conflationCalculator = globalCalculator, + blobCalculator = compressedBlobCalculator, + metricsFacade = metricsFacade, + batchesLimit = batchesLimit, + ) + } + private val conflationService: ConflationService = + ConflationServiceImpl(calculator = conflationCalculator, metricsFacade = metricsFacade) + + private val zkStateClient: StateManagerClientV1 = StateManagerV1JsonRpcClient.Companion.create( + rpcClientFactory = httpJsonRpcClientFactory, + endpoints = configs.stateManager.endpoints.map { it.toURI() }, + maxInflightRequestsPerClient = configs.stateManager.requestLimitPerEndpoint, + requestRetry = configs.stateManager.requestRetries.toJsonRpcRetry(), + zkStateManagerVersion = configs.stateManager.version, + logger = LogManager.getLogger("clients.StateManagerShomeiClient"), + ) + + private val proverClientFactory = ProverClientFactory( + vertx = vertx, + config = configs.proversConfig, + metricsFacade = metricsFacade, + ) + + private val blobCompressionProofCoordinator = run { + val maxProvenBlobCache = run { + val highestProvenBlobTracker = HighestProvenBlobTracker(lastProcessedBlockNumber) + metricsFacade.createGauge( + category = LineaMetricsCategory.BLOB, + name = "proven.highest.block.number", + description = "Highest proven blob compression block number", + measurementSupplier = highestProvenBlobTracker, + ) + highestProvenBlobTracker + } + val blobCompressionProofHandler: (BlobCompressionProofUpdate) -> SafeFuture<*> = SimpleCompositeSafeFutureHandler( + listOf( + maxProvenBlobCache, + ), + ) + val genesisStateProvider = GenesisStateProvider( + stateRootHash = configs.protocol.genesis.genesisStateRootHash, + shnarf = configs.protocol.genesis.genesisShnarf, + ) + + val blobCompressionProofCoordinator = BlobCompressionProofCoordinator( + vertx = vertx, + blobsRepository = blobsRepository, + blobCompressionProverClient = proverClientFactory.blobCompressionProverClient(), + rollingBlobShnarfCalculator = RollingBlobShnarfCalculator( + blobShnarfCalculator = GoBackedBlobShnarfCalculator( + version = ShnarfCalculatorVersion.V1_2, + metricsFacade = metricsFacade, + ), + blobsRepository = blobsRepository, + genesisShnarf = genesisStateProvider.shnarf, + ), + blobZkStateProvider = BlobZkStateProviderImpl( + zkStateClient = zkStateClient, + ), + config = BlobCompressionProofCoordinator.Config( + pollingInterval = configs.conflation.blobCompression.handlerPollingInterval, + ), + blobCompressionProofHandler = blobCompressionProofHandler, + metricsFacade = metricsFacade, + ) + val highestUnprovenBlobTracker = HighestUnprovenBlobTracker(lastProcessedBlockNumber) + metricsFacade.createGauge( + category = LineaMetricsCategory.BLOB, + name = "unproven.highest.block.number", + description = "Block number of highest unproven blob produced", + measurementSupplier = highestUnprovenBlobTracker, + ) + + val compositeSafeFutureHandler = SimpleCompositeSafeFutureHandler( + listOf( + blobCompressionProofCoordinator::handleBlob, + highestUnprovenBlobTracker, + ), + ) + conflationCalculator.onBlobCreation(compositeSafeFutureHandler) + blobCompressionProofCoordinator + } + + private val proofAggregationCoordinatorService: LongRunningService = run { + val maxBlobEndBlockNumberTracker = ConsecutiveProvenBlobsProviderWithLastEndBlockNumberTracker( + aggregationsRepository, + lastProcessedBlockNumber, + ) + + metricsFacade.createGauge( + category = LineaMetricsCategory.BLOB, + name = "proven.highest.consecutive.block.number", + description = "Highest consecutive proven blob compression block number", + measurementSupplier = maxBlobEndBlockNumberTracker, + ) + + val highestAggregationTracker = HighestULongTracker(lastConsecutiveAggregatedBlockNumber) + metricsFacade.createGauge( + category = LineaMetricsCategory.AGGREGATION, + name = "proven.highest.block.number", + description = "Highest proven aggregation block number", + measurementSupplier = highestAggregationTracker, + ) + + ProofAggregationCoordinatorService.Companion + .create( + vertx = vertx, + aggregationCoordinatorPollingInterval = configs.conflation.proofAggregation.coordinatorPollingInterval, + deadlineCheckInterval = configs.conflation.proofAggregation.deadlineCheckInterval, + aggregationDeadline = configs.conflation.proofAggregation.deadline, + latestBlockProvider = GethCliqueSafeBlockProvider( + web3j = l2Web3jClient, + config = GethCliqueSafeBlockProvider.Config(0), + ), + maxProofsPerAggregation = configs.conflation.proofAggregation.proofsLimit, + startBlockNumberInclusive = lastConsecutiveAggregatedBlockNumber + 1u, + aggregationsRepository = aggregationsRepository, + consecutiveProvenBlobsProvider = maxBlobEndBlockNumberTracker, + proofAggregationClient = proverClientFactory.proofAggregationProverClient(), + l2EthApiClient = createEthApiClient( + l2Web3jClient, + requestRetryConfig = RetryConfig( + backoffDelay = 1.seconds, + failuresWarningThreshold = 3u, + ), + vertx = vertx, + ), + l2MessageService = Web3JL2MessageServiceSmartContractClient.Companion.createReadOnly( + web3jClient = l2Web3jClient, + contractAddress = configs.protocol.l2.contractAddress, + smartContractErrors = configs.smartContractErrors, + smartContractDeploymentBlockNumber = configs.protocol.l2.contractDeploymentBlockNumber?.getNumber(), + ), + aggregationDeadlineDelay = configs.conflation.conflationDeadlineLastBlockConfirmationDelay, + targetEndBlockNumbers = configs.conflation.proofAggregation.targetEndBlocks ?: emptyList(), + metricsFacade = metricsFacade, + provenAggregationEndBlockNumberConsumer = { aggEndBlockNumber -> highestAggregationTracker(aggEndBlockNumber) }, + aggregationSizeMultipleOf = configs.conflation.proofAggregation.aggregationSizeMultipleOf, + ) + } + + private val block2BatchCoordinator = run { + val tracesCountersClient = run { + val tracesCountersLog = LogManager.getLogger("clients.traces.counters") + TracesGeneratorJsonRpcClientV2( + vertx = vertx, + rpcClient = httpJsonRpcClientFactory.createWithLoadBalancing( + endpoints = configs.traces.counters.endpoints.toSet(), + maxInflightRequestsPerClient = configs.traces.counters.requestLimitPerEndpoint, + log = tracesCountersLog, + ), + config = TracesGeneratorJsonRpcClientV2.Config( + expectedTracesApiVersion = configs.traces.expectedTracesApiVersion, + ), + retryConfig = configs.traces.counters.requestRetries.toJsonRpcRetry(), + log = tracesCountersLog, + ) + } + + val tracesConflationClient = run { + val tracesConflationLog = LogManager.getLogger("clients.traces.conflation") + TracesGeneratorJsonRpcClientV2( + vertx = vertx, + rpcClient = httpJsonRpcClientFactory.createWithLoadBalancing( + endpoints = configs.traces.conflation.endpoints.toSet(), + maxInflightRequestsPerClient = configs.traces.conflation.requestLimitPerEndpoint, + log = tracesConflationLog, + ), + config = TracesGeneratorJsonRpcClientV2.Config( + expectedTracesApiVersion = configs.traces.expectedTracesApiVersion, + ), + retryConfig = configs.traces.conflation.requestRetries.toJsonRpcRetry(), + log = tracesConflationLog, + ) + } + + val blobsConflationHandler: (BlocksConflation) -> SafeFuture<*> = run { + val maxProvenBatchCache = run { + val highestProvenBatchTracker = HighestProvenBatchTracker(lastProcessedBlockNumber) + metricsFacade.createGauge( + category = LineaMetricsCategory.BATCH, + name = "proven.highest.block.number", + description = "Highest proven batch execution block number", + measurementSupplier = highestProvenBatchTracker, + ) + highestProvenBatchTracker + } + + val batchProofHandler = SimpleCompositeSafeFutureHandler( + listOf( + maxProvenBatchCache, + BatchProofHandlerImpl(batchesRepository)::acceptNewBatch, + ), + ) + val executionProverClient: ExecutionProverClientV2 = proverClientFactory.executionProverClient( + // we cannot use configs.traces.expectedTracesApiVersion because it breaks prover expected version pattern + tracesVersion = "2.1.0", + stateManagerVersion = configs.stateManager.version, + ) + + val proofGeneratingConflationHandlerImpl = ProofGeneratingConflationHandlerImpl( + tracesProductionCoordinator = TracesConflationCoordinatorImpl(tracesConflationClient, zkStateClient), + zkProofProductionCoordinator = ZkProofCreationCoordinatorImpl( + executionProverClient = executionProverClient, + l2EthApiClient = createEthApiClient( + rpcUrl = configs.conflation.l2Endpoint.toString(), + log = LogManager.getLogger("clients.l2.eth.conflation"), + requestRetryConfig = configs.conflation.l2RequestRetries, + vertx = vertx, + ), + messageServiceAddress = configs.protocol.l2.contractAddress, + ), + batchProofHandler = batchProofHandler, + vertx = vertx, + config = ProofGeneratingConflationHandlerImpl.Config(5.seconds), + ) + + val highestConflationTracker = HighestConflationTracker(lastProcessedBlockNumber) + metricsFacade.createGauge( + category = LineaMetricsCategory.CONFLATION, + name = "last.block.number", + description = "Last conflated block number", + measurementSupplier = highestConflationTracker, + ) + val conflationsCounter = metricsFacade.createCounter( + category = LineaMetricsCategory.CONFLATION, + name = "counter", + description = "Counter of new conflations", + ) + + SimpleCompositeSafeFutureHandler( + listOf( + proofGeneratingConflationHandlerImpl::handleConflatedBatch, + highestConflationTracker, + { + conflationsCounter.increment() + SafeFuture.COMPLETE + }, + ), + ) + } + + conflationService.onConflatedBatch(blobsConflationHandler) + + BlockToBatchSubmissionCoordinator( + conflationService = conflationService, + tracesCountersClient = tracesCountersClient, + vertx = vertx, + encoder = BlockRLPEncoder, + ) + } + + private val lastProvenBlockNumberProvider = run { + val lastProvenConsecutiveBatchBlockNumberProvider = BatchesRepoBasedLastProvenBlockNumberProvider( + lastProcessedBlockNumber.toLong(), + batchesRepository, + ) + metricsFacade.createGauge( + category = LineaMetricsCategory.BATCH, + name = "proven.highest.consecutive.block.number", + description = "Highest proven consecutive execution batch block number", + measurementSupplier = { lastProvenConsecutiveBatchBlockNumberProvider.getLastKnownProvenBlockNumber() }, + ) + lastProvenConsecutiveBatchBlockNumberProvider + } + + private val blockCreationMonitor = run { + log.info("Resuming conflation from block={} inclusive", lastProcessedBlockNumber + 1UL) + val blockCreationMonitor = BlockCreationMonitor( + vertx = vertx, + web3j = ExtendedWeb3JImpl(l2Web3jClient), + startingBlockNumberExclusive = lastProcessedBlockNumber.toLong(), + blockCreationListener = block2BatchCoordinator, + lastProvenBlockNumberProviderAsync = lastProvenBlockNumberProvider, + config = BlockCreationMonitor.Config( + pollingInterval = configs.conflation.blocksPollingInterval, + blocksToFinalization = 0L, + blocksFetchLimit = configs.conflation.l2FetchBlocksLimit.toLong(), + // We need to add 1 to forceStopConflationAtBlockInclusive because conflation calculator requires + // block_number = forceStopConflationAtBlockInclusive + 1 to trigger conflation at + // forceStopConflationAtBlockInclusive + lastL2BlockNumberToProcessInclusive = configs.conflation.forceStopConflationAtBlockInclusive?.inc(), + ), + ) + blockCreationMonitor + } + + override fun start(): CompletableFuture { + return cleanupDbDataAfterBlockNumbers( + lastProcessedBlockNumber = lastProcessedBlockNumber, + lastConsecutiveAggregatedBlockNumber = lastConsecutiveAggregatedBlockNumber, + batchesRepository = batchesRepository, + blobsRepository = blobsRepository, + aggregationsRepository = aggregationsRepository, + ) + .thenCompose { proofAggregationCoordinatorService.start() } + .thenCompose { deadlineConflationCalculatorRunner?.start() ?: SafeFuture.completedFuture(Unit) } + .thenCompose { blockCreationMonitor.start() } + .thenCompose { blobCompressionProofCoordinator.start() } + .thenPeek { + log.info("Conflation started") + } + } + + override fun stop(): CompletableFuture { + return SafeFuture.allOf( + proofAggregationCoordinatorService.stop(), + blockCreationMonitor.stop(), + deadlineConflationCalculatorRunner?.stop() ?: SafeFuture.completedFuture(Unit), + blobCompressionProofCoordinator.stop(), + ) + .thenApply { log.info("Conflation Stopped") } + } + + fun updateLatestL1FinalizedBlock(blockNumber: Long): SafeFuture { + return lastProvenBlockNumberProvider.updateLatestL1FinalizedBlock(blockNumber) + } + + private fun createDeadlineConflationCalculatorRunner( + l2Web3jClient: Web3j, + ): DeadlineConflationCalculatorRunner? { + if (configs.conflation.isDisabled() || configs.conflation.conflationDeadline == null) { + log.info("Conflation deadline calculator is disabled") + return null + } + + return DeadlineConflationCalculatorRunner( + conflationDeadlineCheckInterval = configs.conflation.conflationDeadlineCheckInterval, + delegate = ConflationCalculatorByTimeDeadline( + config = ConflationCalculatorByTimeDeadline.Config( + conflationDeadline = configs.conflation.conflationDeadline, + conflationDeadlineLastBlockConfirmationDelay = + configs.conflation.conflationDeadlineLastBlockConfirmationDelay, + ), + lastBlockNumber = lastProcessedBlockNumber, + clock = Clock.System, + latestBlockProvider = GethCliqueSafeBlockProvider( + l2Web3jClient, + GethCliqueSafeBlockProvider.Config(blocksToFinalization = 0), + ), + ), + ) + } + + private fun addBlocksLimitCalculatorIfDefined(calculators: MutableList) { + if (configs.conflation.blocksLimit != null) { + calculators.add( + ConflationCalculatorByBlockLimit( + blockLimit = configs.conflation.blocksLimit, + ), + ) + } + } + + private fun addTargetEndBlockConflationCalculatorIfDefined(calculators: MutableList) { + if (configs.conflation.proofAggregation.targetEndBlocks?.isNotEmpty() ?: false) { + calculators.add( + ConflationCalculatorByTargetBlockNumbers( + targetEndBlockNumbers = configs.conflation.proofAggregation.targetEndBlocks!!.toSet(), + ), + ) + } + } + + private fun createCalculatorsForBlobsAndConflation( + logger: Logger, + compressedBlobCalculator: ConflationCalculatorByDataCompressed, + ): List { + val calculators: MutableList = + mutableListOf( + ConflationCalculatorByExecutionTraces( + tracesCountersLimit = configs.conflation.tracesLimitsV2, + emptyTracesCounters = TracesCountersV2.Companion.EMPTY_TRACES_COUNT, + metricsFacade = metricsFacade, + log = logger, + ), + compressedBlobCalculator, + ) + addBlocksLimitCalculatorIfDefined(calculators) + addTargetEndBlockConflationCalculatorIfDefined(calculators) + return calculators + } +} diff --git a/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/conflation/ConflationAppHelper.kt b/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/conflation/ConflationAppHelper.kt new file mode 100644 index 00000000..14525d82 --- /dev/null +++ b/coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/conflation/ConflationAppHelper.kt @@ -0,0 +1,54 @@ +package net.consensys.zkevm.coordinator.app.conflation + +import net.consensys.zkevm.persistence.AggregationsRepository +import net.consensys.zkevm.persistence.BatchesRepository +import net.consensys.zkevm.persistence.BlobsRepository +import tech.pegasys.teku.infrastructure.async.SafeFuture + +object ConflationAppHelper { + /** + * Returns the last block number inclusive upto which we have consecutive proven blobs or the last finalized block + * number inclusive + */ + fun resumeConflationFrom( + aggregationsRepository: AggregationsRepository, + lastFinalizedBlock: ULong, + ): SafeFuture { + return aggregationsRepository + .findConsecutiveProvenBlobs(lastFinalizedBlock.toLong() + 1) + .thenApply { blobAndBatchCounters -> + if (blobAndBatchCounters.isNotEmpty()) { + blobAndBatchCounters.last().blobCounters.endBlockNumber + } else { + lastFinalizedBlock + } + } + } + + fun resumeAggregationFrom( + aggregationsRepository: AggregationsRepository, + lastFinalizedBlock: ULong, + ): SafeFuture { + return aggregationsRepository + .findHighestConsecutiveEndBlockNumber(lastFinalizedBlock.toLong() + 1) + .thenApply { highestEndBlockNumber -> + highestEndBlockNumber?.toULong() ?: lastFinalizedBlock + } + } + + fun cleanupDbDataAfterBlockNumbers( + lastProcessedBlockNumber: ULong, + lastConsecutiveAggregatedBlockNumber: ULong, + batchesRepository: BatchesRepository, + blobsRepository: BlobsRepository, + aggregationsRepository: AggregationsRepository, + ): SafeFuture<*> { + val blockNumberInclusiveToDeleteFrom = lastProcessedBlockNumber + 1u + val cleanupBatches = batchesRepository.deleteBatchesAfterBlockNumber(blockNumberInclusiveToDeleteFrom.toLong()) + val cleanupBlobs = blobsRepository.deleteBlobsAfterBlockNumber(blockNumberInclusiveToDeleteFrom) + val cleanupAggregations = aggregationsRepository + .deleteAggregationsAfterBlockNumber((lastConsecutiveAggregatedBlockNumber + 1u).toLong()) + + return SafeFuture.allOf(cleanupBatches, cleanupBlobs, cleanupAggregations) + } +} diff --git a/coordinator/app/src/test/kotlin/net/consensys/zkevm/coordinator/app/L1DependentAppTest.kt b/coordinator/app/src/test/kotlin/net/consensys/zkevm/coordinator/app/ConflationAppTest.kt similarity index 89% rename from coordinator/app/src/test/kotlin/net/consensys/zkevm/coordinator/app/L1DependentAppTest.kt rename to coordinator/app/src/test/kotlin/net/consensys/zkevm/coordinator/app/ConflationAppTest.kt index 45304a79..c80a28b4 100644 --- a/coordinator/app/src/test/kotlin/net/consensys/zkevm/coordinator/app/L1DependentAppTest.kt +++ b/coordinator/app/src/test/kotlin/net/consensys/zkevm/coordinator/app/ConflationAppTest.kt @@ -1,5 +1,7 @@ package net.consensys.zkevm.coordinator.app +import net.consensys.zkevm.coordinator.app.conflation.ConflationAppHelper.cleanupDbDataAfterBlockNumbers +import net.consensys.zkevm.coordinator.app.conflation.ConflationAppHelper.resumeConflationFrom import net.consensys.zkevm.persistence.AggregationsRepository import net.consensys.zkevm.persistence.BatchesRepository import net.consensys.zkevm.persistence.BlobsRepository @@ -11,7 +13,7 @@ import org.mockito.Mockito.verify import org.mockito.kotlin.whenever import tech.pegasys.teku.infrastructure.async.SafeFuture -class L1DependentAppTest { +class ConflationAppTest { @Test fun `test resume conflation from uses lastFinalizedBlock + 1 for db queries`() { val aggregationsRepository = mock() @@ -21,7 +23,7 @@ class L1DependentAppTest { .thenReturn(SafeFuture.completedFuture(emptyList())) val lastProcessedBlock = - L1DependentApp.resumeConflationFrom( + resumeConflationFrom( aggregationsRepository, lastFinalizedBlock, ).get() @@ -44,7 +46,7 @@ class L1DependentAppTest { whenever(aggregationsRepository.deleteAggregationsAfterBlockNumber(anyLong())) .thenReturn(SafeFuture.completedFuture(0)) - L1DependentApp.cleanupDbDataAfterBlockNumbers( + cleanupDbDataAfterBlockNumbers( lastProcessedBlockNumber = lastProcessedBlock, lastConsecutiveAggregatedBlockNumber = lastConsecutiveAggregatedBlockNumber, batchesRepository = batchesRepository,