Staterecovery - plugin rename and debug logging (#593)

* staterecovery: Fix renamings, logs and adds e2e test for restarts
This commit is contained in:
Pedro Novais
2025-01-27 15:54:24 +00:00
committed by GitHub
parent 62a2594984
commit 6995d99a1d
24 changed files with 630 additions and 144 deletions

View File

@@ -17,9 +17,6 @@ concurrency:
cancel-in-progress: ${{ github.ref != 'refs/heads/main' }}
jobs:
cache-docker-images:
uses: ./.github/workflows/cache-docker-images.yml
secrets: inherit
run-tests:
env:
COMMIT_TAG: ${{ inputs.commit_tag }}
@@ -71,10 +68,20 @@ jobs:
timeout-minutes: 15
run: |
./gradlew state-recovery:test-cases:integrationTest
- name: Run Jacoco Staterecovery
run: |
./gradlew jacocoRootReport
# - name: Setup upterm session
# if: ${{ inputs.e2e-tests-with-ssh }}
# uses: lhotari/action-upterm@v1
# Stack fails to start in CI - Disable for now
# - name: Run E2E tests
# timeout-minutes: 15
# run: |
# mkdir -p tmp/local/
# chmod -R a+w tmp/local/
# ./gradlew state-recovery:test-cases:e2eTest
# TMP disable as it conflicts with coordinator report
# - name: Run Jacoco Staterecovery
# run: |
# ./gradlew jacocoRootReport
# - name: Upload Jacoco report Staterecovery
# uses: actions/upload-artifact@v4
# with:

View File

@@ -29,9 +29,10 @@ clean-testnet-folders:
clean-environment:
docker compose -f docker/compose.yml -f docker/compose-local-dev-traces-v2.overrides.yml --profile l1 --profile l2 --profile debug --profile staterecovery kill -s 9 || true
docker compose -f docker/compose.yml -f docker/compose-local-dev-traces-v2.overrides.yml --profile l1 --profile l2 --profile debug --profile staterecovery down || true
make clean-local-folders
docker network prune -f
docker volume rm linea-local-dev linea-logs || true # ignore failure if volumes do not exist already
docker system prune -f || true
start-l1:
L1_GENESIS_TIME=$(get_future_time) docker compose -f docker/compose.yml -f docker/compose-local-dev.overrides.yml --profile l1 up -d
@@ -50,8 +51,10 @@ start-whole-environment:
start-whole-environment-traces-v2: COMPOSE_PROFILES:=l1,l2
start-whole-environment-traces-v2:
COMPOSE_PROFILES=$(COMPOSE_PROFILES) docker compose -f docker/compose.yml -f docker/compose-local-dev-traces-v2.overrides.yml up -d
@if [ -z "$(L1_GENESIS_TIME)" ]; then \
L1_GENESIS_TIME=$(get_future_time); \
fi; \
L1_GENESIS_TIME=$$L1_GENESIS_TIME COMPOSE_PROFILES=$(COMPOSE_PROFILES) docker compose -f docker/compose.yml -f docker/compose-local-dev-traces-v2.overrides.yml up -d
pull-all-images:
COMPOSE_PROFILES:=l1,l2 docker compose -f docker/compose.yml -f docker/compose-local-dev-traces-v2.overrides.yml pull
@@ -220,18 +223,19 @@ fresh-start-all-staterecovery: COMPOSE_PROFILES:=l1,l2,staterecovery
fresh-start-all-staterecovery: L1_CONTRACT_VERSION:=6
fresh-start-all-staterecovery:
make clean-environment
L1_GENESIS_TIME=$(get_future_time) make start-whole-environment-traces-v2 COMPOSE_PROFILES=$(COMPOSE_PROFILES)
make start-whole-environment-traces-v2 COMPOSE_PROFILES=$(COMPOSE_PROFILES)
$(MAKE) deploy-contracts-minimal L1_CONTRACT_VERSION=$(L1_CONTRACT_VERSION)
fresh-start-staterecovery-for-replay-only: COMPOSE_PROFILES:=l1,staterecovery
fresh-start-staterecovery-for-replay-only:
make clean-environment
L1_GENESIS_TIME=$(get_future_time) make start-whole-environment-traces-v2 COMPOSE_PROFILES=$(COMPOSE_PROFILES)
make start-whole-environment-traces-v2 COMPOSE_PROFILES=$(COMPOSE_PROFILES)
staterecovery-replay-from-genesis: L1_ROLLUP_CONTRACT_ADDRESS:=0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9
staterecovery-replay-from-genesis:
staterecovery-replay-from-block: L1_ROLLUP_CONTRACT_ADDRESS:=0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9
staterecovery-replay-from-block: PLUGIN_STATERECOVERY_OVERRIDE_START_BLOCK_NUMBER:=1
staterecovery-replay-from-block:
docker compose -f docker/compose.yml down zkbesu-shomei-sr shomei-sr
L1_ROLLUP_CONTRACT_ADDRESS=$(L1_ROLLUP_CONTRACT_ADDRESS) docker compose -f docker/compose.yml up zkbesu-shomei-sr shomei-sr -d
L1_ROLLUP_CONTRACT_ADDRESS=$(L1_ROLLUP_CONTRACT_ADDRESS) PLUGIN_STATERECOVERY_OVERRIDE_START_BLOCK_NUMBER=$(PLUGIN_STATERECOVERY_OVERRIDE_START_BLOCK_NUMBER) docker compose -f docker/compose.yml up zkbesu-shomei-sr shomei-sr -d
testnet-start-l2:
docker compose -f docker/compose.yml -f docker/compose-testnet-sync.overrides.yml --profile l2 up -d

View File

@@ -179,7 +179,7 @@ dockerCompose {
projectName = "docker"
}
localStackForStateRecover {
localStackForStateRecovery {
startedServices = [
"postgres",
"sequencer",
@@ -189,9 +189,6 @@ dockerCompose {
"blobscan-api",
"blobscan-indexer",
"redis",
// For debug
// "l1-blockscout",
// "l2-blockscout"
]
composeAdditionalArgs = [
"--profile",

View File

@@ -3,6 +3,7 @@ package linea.testing
import net.consensys.linea.async.toSafeFuture
import net.consensys.linea.testing.filesystem.getPathTo
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.io.BufferedReader
import java.io.File
@@ -12,9 +13,15 @@ import kotlin.time.Duration.Companion.minutes
data class CommandResult(
val exitCode: Int,
val stdOut: List<String>,
val stdErr: List<String>
)
val stdOutLines: List<String>,
val stdErrLines: List<String>
) {
val isSuccess: Boolean = exitCode == 0
val stdOutStr: String
get() = stdOutLines.joinToString("\n")
val stdErrStr: String
get() = stdErrLines.joinToString("\n")
}
object Runner {
@@ -22,9 +29,9 @@ object Runner {
command: String,
envVars: Map<String, String> = emptyMap(),
executionDir: File = getPathTo("Makefile").parent.toFile(),
timeout: Duration = 1.minutes
timeout: Duration = 1.minutes,
log: Logger = LogManager.getLogger(Runner::class.java)
): SafeFuture<CommandResult> {
val log = LogManager.getLogger("net.consensys.zkevm.ethereum.CommandExecutor")
val processBuilder = ProcessBuilder("/bin/sh", "-c", command)
processBuilder.directory(executionDir)
@@ -75,4 +82,24 @@ object Runner {
return futureResult.toSafeFuture()
}
fun executeCommandFailOnNonZeroExitCode(
command: String,
envVars: Map<String, String> = emptyMap(),
executionDir: File = getPathTo("Makefile").parent.toFile(),
timeout: Duration = 1.minutes,
log: Logger = LogManager.getLogger(Runner::class.java)
): SafeFuture<CommandResult> {
return executeCommand(command, envVars, executionDir, timeout, log)
.thenCompose { execResult ->
if (!execResult.isSuccess) {
val errorMessage = "command='$command' failed with exitCode=${execResult.exitCode} " +
"STDERR=${execResult.stdErrStr}"
log.debug(errorMessage)
SafeFuture.failedFuture(RuntimeException(errorMessage))
} else {
SafeFuture.completedFuture(execResult)
}
}
}
}

View File

@@ -23,7 +23,7 @@ fun getDeployedAddress(
commandResult: CommandResult,
addressPattern: Pattern
): DeployedContract {
val lines = commandResult.stdOut.toList().asReversed()
val lines = commandResult.stdOutLines.toList().asReversed()
return getDeployedAddress(lines, addressPattern)
}
@@ -55,8 +55,8 @@ private fun deployContract(
throw IllegalStateException(
"Command $command failed: " +
"\nexitCode=${result.exitCode} " +
"\nSTD_OUT: \n${result.stdOut.joinToString("\n")}" +
"\nSTD_ERROR: \n${result.stdErr.joinToString("\n")}"
"\nSTD_OUT: \n${result.stdOutStr}" +
"\nSTD_ERROR: \n${result.stdErrStr}"
)
} else {
runCatching { getDeployedAddress(result, addressPattern) }
@@ -107,9 +107,9 @@ fun makeDeployL2MessageService(
fun logCommand(commandResult: CommandResult) {
println("stdout:")
commandResult.stdOut.forEach { println(it) }
println(commandResult.stdOutStr)
println("stderr:")
commandResult.stdErr.forEach { println(it) }
println(commandResult.stdErrStr)
println("exit code: ${commandResult.exitCode}")
}

View File

@@ -726,17 +726,18 @@ services:
environment:
LOG4J_CONFIGURATION_FILE: /var/lib/besu/log4j.xml
L1_ROLLUP_CONTRACT_ADDRESS: ${L1_ROLLUP_CONTRACT_ADDRESS:-0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9}
PLUGIN_STATERECOVERY_OVERRIDE_START_BLOCK_NUMBER: ${PLUGIN_STATERECOVERY_OVERRIDE_START_BLOCK_NUMBER:-1}
entrypoint:
- /bin/bash
- -c
- |
rm /opt/besu/plugins/linea-staterecovery-besu-plugin-v* && \
(rm /opt/besu/plugins/linea-staterecovery-besu-plugin-v* || true) && \
ls -lh /opt/besu/plugins && \
sed -i '/^CLASSPATH/c\CLASSPATH=/opt/besu/lib/\*\:/opt/besu/plugins/\*' /opt/besu/bin/besu && \
/opt/besu/bin/besu \
--config-file=/var/lib/besu/zkbesu-config.toml \
--genesis-file=/var/lib/besu/genesis.json \
--plugins=BesuShomeiRpcPlugin,ZkTrieLogPlugin,LineaStateRecoverPlugin \
--plugins=BesuShomeiRpcPlugin,ZkTrieLogPlugin,LineaStateRecoveryPlugin \
--rpc-http-api=ADMIN,DEBUG,NET,ETH,WEB3,PLUGINS,MINER,SHOMEI \
--Xbonsai-limit-trie-logs-enabled=false \
--plugin-shomei-http-host="11.11.11.117" \
@@ -745,8 +746,8 @@ services:
--plugin-staterecovery-shomei-endpoint=http://shomei-sr:8888 \
--plugin-staterecovery-blobscan-endpoint=http://blobscan-api:4001 \
--plugin-staterecovery-linea-sequencer-beneficiary-address=0x6d976c9b8ceee705d4fe8699b44e5eb58242f484 \
--plugin-staterecovery-overriding-recovery-start-block-number=1 \
--plugin-staterecovery-l1-polling-interval=PT0.1S
--plugin-staterecovery-l1-polling-interval=PT0.1S \
--bootnodes=enode://14408801a444dafc44afbccce2eb755f902aed3b5743fed787b3c790e021fef28b8c827ed896aa4e8fb46e22bd67c39f994a73768b4b382f8597b0d44370e15d@11.11.11.101:30303
volumes:
- ./config/zkbesu-shomei/zkbesu-config.toml:/var/lib/besu/zkbesu-config.toml:ro
- ./config/zkbesu-shomei/log4j-staterecovery.xml:/var/lib/besu/log4j.xml:ro

View File

@@ -15,7 +15,7 @@
<Logger name="org.hyperledger.besu" level="WARN" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="net.consensys.shomei.trielog" level="TRACE" additivity="false">
<Logger name="net.consensys.shomei.trielog" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<!-- to avoid annoying message "INFO ... No sync target, waiting for peers. Current peers: 0" change to WARN-->
@@ -34,7 +34,10 @@
<Logger name="io.opentelemetry" level="WARN" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="linea.staterecovery" level="INFO" additivity="false">
<Logger name="linea.staterecovery" level="DEBUG" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="linea.staterecovery.plugin.BlockImporter" level="DEBUG" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="linea.staterecovery.clients.ExecutionLayerInProcessClient" level="DEBUG" additivity="false">

View File

@@ -83,7 +83,7 @@ class BlobDecompressorToDomainV1(
): SafeFuture<List<BlockFromL1RecoveredData>> {
var blockNumber = startBlockNumber
val startTime = Clock.System.now()
logger.debug("start decompressing blobs: startBlockNumber={} {} blobs", startBlockNumber, blobs.size)
logger.trace("start decompressing blobs: startBlockNumber={} {} blobs", startBlockNumber, blobs.size)
val decompressedBlobs = blobs.map { decompressor.decompress(it) }
return SafeFuture
.collectAll(decompressedBlobs.map(::decodeBlocksAsync).stream())

View File

@@ -17,7 +17,7 @@ import java.util.concurrent.CompletableFuture
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
class StateRecoverApp(
class StateRecoveryApp(
private val vertx: Vertx,
// Driving Ports
private val lineaContractClient: LineaRollupSmartContractClientReadOnly,

View File

@@ -70,6 +70,13 @@ class StateSynchronizerService(
}
return findNextFinalization()
.thenPeek { nextFinalization ->
log.debug(
"sync state loop: lastSuccessfullyProcessedFinalization={} nextFinalization={}",
lastSuccessfullyProcessedFinalization?.event?.intervalString(),
nextFinalization?.event?.intervalString()
)
}
.thenCompose { nextFinalization ->
if (nextFinalization == null) {
// nothing to do for now

View File

@@ -8,6 +8,7 @@ import linea.staterecovery.plugin.BlockImporter
import linea.staterecovery.plugin.RecoveryModeManager
import net.consensys.linea.BlockNumberAndHash
import net.consensys.linea.BlockParameter
import net.consensys.linea.CommonDomainFunctions
import org.apache.logging.log4j.LogManager
import org.hyperledger.besu.plugin.data.BlockHeader
import org.hyperledger.besu.plugin.services.BlockSimulationService
@@ -104,7 +105,10 @@ class ExecutionLayerInProcessClient(
if (log.isTraceEnabled) {
log.trace("importing blocks from blob: blocks={}", blocks)
} else {
log.debug("importing blocks from blob: blocks={}", blocks.map { it.header.blockNumber })
log.debug(
"importing blocks from blob: blocks={}",
CommonDomainFunctions.blockIntervalString(blocks.first().header.blockNumber, blocks.last().header.blockNumber)
)
}
}
}

View File

@@ -8,7 +8,7 @@ import io.vertx.core.Vertx
import io.vertx.micrometer.backends.BackendRegistries
import linea.staterecovery.BlockHeaderStaticFields
import linea.staterecovery.ExecutionLayerClient
import linea.staterecovery.StateRecoverApp
import linea.staterecovery.StateRecoveryApp
import linea.staterecovery.TransactionDetailsClient
import linea.staterecovery.clients.VertxTransactionDetailsClient
import linea.staterecovery.clients.blobscan.BlobScanClient
@@ -29,8 +29,8 @@ fun createAppAllInProcess(
l1RpcEndpoint: URI,
blobScanEndpoint: URI,
blockHeaderStaticFields: BlockHeaderStaticFields,
appConfig: StateRecoverApp.Config
): StateRecoverApp {
appConfig: StateRecoveryApp.Config
): StateRecoveryApp {
val lineaContractClient = Web3JLineaRollupSmartContractClientReadOnly(
contractAddress = appConfig.smartContractAddress,
web3j = createWeb3jHttpClient(
@@ -78,7 +78,7 @@ fun createAppAllInProcess(
logger = LogManager.getLogger("linea.plugin.staterecover.clients.l1.transaction-details")
)
val app = StateRecoverApp(
val app = StateRecoveryApp(
vertx = vertx,
lineaContractClient = lineaContractClient,
ethLogsSearcher = ethLogsSearcher,

View File

@@ -33,7 +33,7 @@ class BlockImporter(
private fun executeBlockWithTransactionsWithoutSignature(
block: BlockFromL1RecoveredData
): PluginBlockSimulationResult {
log.debug(
log.trace(
"simulating import block={} blockHash={}",
block.header.blockNumber,
block.header.blockHash.encodeHex()
@@ -52,7 +52,7 @@ class BlockImporter(
StateOverrideMap()
)
log.debug(
log.trace(
" import simulation result: block={} blockHeader={}",
executedBlockResult.blockHeader.number,
executedBlockResult.blockHeader
@@ -73,7 +73,7 @@ class BlockImporter(
}
fun importBlock(context: BlockContext): PluginBlockSimulationResult {
log.debug(
log.trace(
"calling simulateAndPersistWorldState block={} blockHeader={}",
context.blockHeader.number,
context.blockHeader
@@ -86,7 +86,7 @@ class BlockImporter(
createOverrides(context.blockHeader),
StateOverrideMap()
)
log.debug(
log.trace(
"simulateAndPersistWorldState result: block={} blockHeader={}",
context.blockHeader.number,
importedBlockResult.blockHeader

View File

@@ -5,7 +5,7 @@ import io.vertx.core.Vertx
import linea.staterecovery.BlockHeaderStaticFields
import linea.staterecovery.FileBasedRecoveryStatusPersistence
import linea.staterecovery.RecoveryStatusPersistence
import linea.staterecovery.StateRecoverApp
import linea.staterecovery.StateRecoveryApp
import linea.staterecovery.clients.ExecutionLayerInProcessClient
import net.consensys.linea.async.get
import org.apache.logging.log4j.LogManager
@@ -27,14 +27,14 @@ fun <T : BesuService> ServiceManager.getServiceOrThrow(clazz: Class<T>): T {
.orElseThrow { IllegalStateException("${clazz.name} is not present in BesuContext") }
}
open class LineaStateRecoverPlugin : BesuPlugin {
private val log: Logger = LogManager.getLogger(LineaStateRecoverPlugin::class.java)
open class LineaStateRecoveryPlugin : BesuPlugin {
private val log: Logger = LogManager.getLogger(LineaStateRecoveryPlugin::class.java)
private val vertx = Vertx.vertx()
private val cliOptions = PluginCliOptions()
private lateinit var serviceManager: ServiceManager
private lateinit var recoveryModeManager: RecoveryModeManager
private lateinit var recoveryStatusPersistence: RecoveryStatusPersistence
private lateinit var stateRecoverApp: StateRecoverApp
private lateinit var stateRecoverApp: StateRecoveryApp
override fun register(serviceManager: ServiceManager) {
log.debug("registering")
@@ -70,7 +70,8 @@ open class LineaStateRecoverPlugin : BesuPlugin {
p2pService = serviceManager.getServiceOrThrow(P2PService::class.java),
miningService = serviceManager.getServiceOrThrow(MiningService::class.java),
recoveryStatePersistence = this.recoveryStatusPersistence,
synchronizationService = synchronizationService
synchronizationService = synchronizationService,
headBlockNumber = blockchainService.chainHeadHeader.number.toULong()
)
val simulatorService = serviceManager.getServiceOrThrow(BlockSimulationService::class.java)
val executionLayerClient = ExecutionLayerInProcessClient.create(
@@ -91,7 +92,7 @@ open class LineaStateRecoverPlugin : BesuPlugin {
l1RpcEndpoint = config.l1RpcEndpoint,
blobScanEndpoint = config.blobscanEndpoint,
blockHeaderStaticFields = blockHeaderStaticFields,
appConfig = StateRecoverApp.Config(
appConfig = StateRecoveryApp.Config(
smartContractAddress = config.l1SmartContractAddress.toString(),
l1LatestSearchBlock = net.consensys.linea.BlockParameter.Tag.LATEST,
overridingRecoveryStartBlockNumber = config.overridingRecoveryStartBlockNumber,

View File

@@ -79,6 +79,7 @@ class PluginCliOptions {
"Tries to force the recovery start block number to the given value. " +
"This is mean for testing purposes, not production. Must be greater than or equal to 1."
],
defaultValue = "\${env:PLUGIN_STATERECOVERY_OVERRIDE_START_BLOCK_NUMBER}",
required = false
)
var overridingRecoveryStartBlockNumber: Long? = null

View File

@@ -14,17 +14,28 @@ class RecoveryModeManager(
private val synchronizationService: SynchronizationService,
private val p2pService: P2PService,
private val miningService: MiningService,
private val recoveryStatePersistence: RecoveryStatusPersistence
private val recoveryStatePersistence: RecoveryStatusPersistence,
headBlockNumber: ULong
) :
BesuEvents.BlockAddedListener {
private val log: Logger = LogManager.getLogger(RecoveryModeManager::class.java.name)
private val recoveryModeTriggered = AtomicBoolean(false)
private var currentBlockNumber: ULong = 0u
val targetBlockNumber: ULong?
get() = recoveryStatePersistence.getRecoveryStartBlockNumber()
var headBlockNumber: ULong = headBlockNumber
private set
val headBlockNumber: ULong
get() = currentBlockNumber
init {
log.info("RecoveryModeManager initializing: headBlockNumber={}", headBlockNumber)
if (hasReachedTargetBlock()) {
log.info(
"enabling recovery mode immediately at blockNumber={} recoveryTargetBlockNumber={}",
headBlockNumber,
targetBlockNumber
)
switchToRecoveryMode()
}
}
/**
* Called when a block is added.
@@ -34,14 +45,14 @@ class RecoveryModeManager(
@Synchronized
override fun onBlockAdded(addedBlockContext: AddedBlockContext) {
val blockNumber = addedBlockContext.blockHeader.number
currentBlockNumber = blockNumber.toULong()
headBlockNumber = blockNumber.toULong()
if (!recoveryModeTriggered.get() && hasReachedTargetBlock()) {
switchToRecoveryMode()
}
}
private fun hasReachedTargetBlock(): Boolean {
return currentBlockNumber >= ((targetBlockNumber ?: ULong.MAX_VALUE) - 1u)
return (headBlockNumber + 1u) >= (targetBlockNumber ?: ULong.MAX_VALUE)
}
/**
@@ -51,20 +62,28 @@ class RecoveryModeManager(
*/
@Synchronized
fun setTargetBlockNumber(targetBlockNumber: ULong) {
check(!recoveryModeTriggered.get()) {
"Cannot set target block number after recovery mode has been triggered"
if (recoveryModeTriggered.get()) {
if (targetBlockNumber == this.targetBlockNumber) {
log.info("recovery mode already enabled at blockNumber={}", headBlockNumber)
return
} else {
check(!recoveryModeTriggered.get()) {
"recovery mode has already been triggered at block=${this.targetBlockNumber} " +
"trying new target=$targetBlockNumber"
}
}
}
val effectiveRecoveryStartBlockNumber = if (targetBlockNumber <= currentBlockNumber + 1u) {
log.warn(
"targetBlockNumber={} is less than or equal to headBlockNumber={}" +
" enabling recovery mode immediately at blockNumber={}",
targetBlockNumber,
currentBlockNumber,
currentBlockNumber + 1u
val effectiveRecoveryStartBlockNumber = if (targetBlockNumber <= headBlockNumber + 1u) {
val effectiveRecoveryStartBlockNumber = headBlockNumber + 1u
log.warn(
"enabling recovery mode immediately at blockNumber={} recoveryTargetBlockNumber={} headBlockNumber={}",
effectiveRecoveryStartBlockNumber,
targetBlockNumber,
headBlockNumber
)
switchToRecoveryMode()
currentBlockNumber + 1u
effectiveRecoveryStartBlockNumber
} else {
targetBlockNumber
}

View File

@@ -1 +1 @@
linea.staterecovery.plugin.LineaStateRecoverPlugin
linea.staterecovery.plugin.LineaStateRecoveryPlugin

View File

@@ -21,6 +21,7 @@ dependencies {
api(project(':state-recovery:appcore:logic'))
implementation project(':jvm-libs:linea:besu-libs')
implementation project(':jvm-libs:linea:testing:file-system')
implementation(testFixtures(project(':jvm-libs:generic:json-rpc')))
implementation(project(':state-recovery:clients:eth-api'))
implementation(project(':state-recovery:clients:blobscan-client'))
@@ -32,6 +33,7 @@ dependencies {
implementation(project(":coordinator:ethereum:test-utils"))
implementation(project(":jvm-libs:linea:testing:l1-blob-and-proof-submission"))
implementation(testFixtures(project(":jvm-libs:linea:blob-compressor")))
implementation(testFixtures(project(':jvm-libs:linea:web3j-extensions')))
testImplementation("io.vertx:vertx-junit5")
testImplementation(project(":jvm-libs:linea:metrics:micrometer"))
}
@@ -39,11 +41,19 @@ dependencies {
sourceSets {
integrationTest {
kotlin {
compileClasspath += sourceSets.main.output + sourceSets.main.compileClasspath + sourceSets.test.compileClasspath
runtimeClasspath += sourceSets.main.output + sourceSets.main.runtimeClasspath + sourceSets.test.runtimeClasspath
compileClasspath += sourceSets.main.output + sourceSets.main.compileClasspath + sourceSets.test.output + sourceSets.test.compileClasspath
runtimeClasspath += sourceSets.main.output + sourceSets.main.runtimeClasspath + sourceSets.test.output + sourceSets.test.runtimeClasspath
}
compileClasspath += sourceSets.main.output + sourceSets.main.compileClasspath + sourceSets.test.compileClasspath
runtimeClasspath += sourceSets.main.output + sourceSets.main.runtimeClasspath + sourceSets.test.runtimeClasspath
compileClasspath += sourceSets.main.output + sourceSets.main.compileClasspath + sourceSets.test.output + sourceSets.test.compileClasspath
runtimeClasspath += sourceSets.main.output + sourceSets.main.runtimeClasspath + sourceSets.test.output + sourceSets.test.runtimeClasspath
}
e2eTest {
kotlin {
compileClasspath += sourceSets.main.output + sourceSets.main.compileClasspath + sourceSets.test.output + sourceSets.test.compileClasspath
runtimeClasspath += sourceSets.main.output + sourceSets.main.runtimeClasspath + sourceSets.test.output + sourceSets.test.runtimeClasspath
}
compileClasspath += sourceSets.main.output + sourceSets.main.compileClasspath + sourceSets.test.output + sourceSets.test.compileClasspath
runtimeClasspath += sourceSets.main.output + sourceSets.main.runtimeClasspath + sourceSets.test.output + sourceSets.test.runtimeClasspath
}
}
@@ -65,7 +75,41 @@ task integrationTest(type: Test) { test ->
classpath = sourceSets.integrationTest.runtimeClasspath
testClassesDirs = sourceSets.integrationTest.output.classesDirs
dependsOn(":localStackForStateRecoverComposeUp")
dependsOn(":localStackForStateRecoveryComposeUp")
testLogging {
events TestLogEvent.FAILED,
TestLogEvent.SKIPPED,
TestLogEvent.STANDARD_ERROR,
TestLogEvent.STARTED,
TestLogEvent.PASSED
exceptionFormat TestExceptionFormat.FULL
showCauses true
showExceptions true
showStackTraces true
// set showStandardStreams if you need to see test logs
showStandardStreams true
}
}
task e2eTest(type: Test) { test ->
outputs.cacheIf { false }
systemProperty "vertx.parameter.filename", project.projectDir.toPath()
.resolve("src/test/resources/vertx-options.json")
.toAbsolutePath().toString()
description = "Runs e2e tests."
group = "verification"
useJUnitPlatform()
classpath = sourceSets.e2eTest.runtimeClasspath
testClassesDirs = sourceSets.e2eTest.output.classesDirs
systemProperties["junit.jupiter.execution.timeout.default"] = "5 m" // 5 minutes
systemProperties["junit.jupiter.execution.parallel.enabled"] = false
systemProperties["junit.jupiter.execution.parallel.mode.default"] = "concurrent"
systemProperties["junit.jupiter.execution.parallel.mode.classes.default"] = "concurrent"
maxParallelForks = Math.max(Runtime.runtime.availableProcessors(), 9)
testLogging {
events TestLogEvent.FAILED,

View File

@@ -0,0 +1,282 @@
package linea.staterecovery
import build.linea.clients.StateManagerClientV1
import build.linea.clients.StateManagerV1JsonRpcClient
import build.linea.domain.EthLogEvent
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import io.vertx.core.Vertx
import io.vertx.junit5.VertxExtension
import kotlinx.datetime.Clock
import linea.domain.RetryConfig
import linea.log4j.configureLoggers
import linea.staterecovery.test.assertBesuAndShomeiStateRootMatches
import linea.staterecovery.test.execCommandAndAssertSuccess
import linea.staterecovery.test.getFinalizationsOnL1
import linea.staterecovery.test.getLastFinalizationOnL1
import linea.staterecovery.test.waitExecutionLayerToBeUpAndRunning
import linea.testing.Runner
import linea.web3j.Web3JLogsSearcher
import linea.web3j.createWeb3jHttpClient
import linea.web3j.waitForTxReceipt
import net.consensys.gwei
import net.consensys.linea.jsonrpc.client.RequestRetryConfig
import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory
import net.consensys.linea.metrics.micrometer.MicrometerMetricsFacade
import net.consensys.linea.testing.filesystem.getPathTo
import net.consensys.toBigInteger
import net.consensys.toULong
import net.consensys.zkevm.ethereum.L2AccountManager
import net.consensys.zkevm.ethereum.Web3jClientManager
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import java.net.URI
import java.nio.file.Files
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
@ExtendWith(VertxExtension::class)
class StateRecoveryE2ETest {
private val log = LogManager.getLogger("test.case.StateRecoverAppWithLocalStackIntTest")
private lateinit var stateManagerClient: StateManagerClientV1
private val executionLayerUrl = "http://localhost:9145"
private val stateManagerUrl = "http://localhost:8890"
@BeforeEach
fun beforeEach(vertx: Vertx) {
val jsonRpcFactory = VertxHttpJsonRpcClientFactory(
vertx = vertx,
metricsFacade = MicrometerMetricsFacade(SimpleMeterRegistry())
)
stateManagerClient = StateManagerV1JsonRpcClient.create(
rpcClientFactory = jsonRpcFactory,
endpoints = listOf(URI(stateManagerUrl)),
maxInflightRequestsPerClient = 1U,
requestRetry = RequestRetryConfig(
backoffDelay = 10.milliseconds,
timeout = 2.seconds
),
zkStateManagerVersion = "2.3.0",
logger = LogManager.getLogger("test.clients.l1.state-manager")
)
configureLoggers(
rootLevel = Level.INFO,
log.name to Level.DEBUG,
"net.consensys.linea.contract.Web3JContractAsyncHelper" to Level.WARN,
"test.clients.l1.executionlayer" to Level.INFO,
"test.clients.l1.web3j-default" to Level.INFO,
"test.clients.l1.state-manager" to Level.DEBUG,
"test.clients.l1.transaction-details" to Level.INFO,
"test.clients.l1.linea-contract" to Level.INFO,
"test.clients.l1.events-fetcher" to Level.INFO,
"test.clients.l1.blobscan" to Level.INFO,
"net.consensys.linea.contract.l1" to Level.INFO
)
}
private fun freshStartOfStack() {
log.debug("restarting the stack")
Runner.executeCommandFailOnNonZeroExitCode(
command = "make fresh-start-all-staterecovery",
envVars = mapOf(
"L1_GENESIS_TIME" to Clock.System.now().plus(5.seconds).epochSeconds.toString()
),
timeout = 2.minutes
).get()
log.debug("stack restarted")
}
@Test
fun `should recover from middle of chain and be resilient to node restarts`(
vertx: Vertx
) {
// Part A:
// we shall have multiple finalizations on L1
// restart FRESH (empty state) Besu & Shomei with recovery block somewhere in the middle of those finalizations
// will partially sync through P2P network
// then will trigger recovery mode and sync the remaining blocks
// assert Besu and Shomei are in sync
// Part B:
// send some txs to L2 to trigger coordinator to finalize on L1
// wait for at least 1 more finalization on L1
// assert Besu and Shomei are in sync
// Part C:
// restart zkbesu-node only
// send some txs to L2 to trigger coordinator to finalize on L1
// wait for at least 1 more finalization on L1
// assert Besu and Shomei are in sync
freshStartOfStack()
// No Errors should be logged in Besu
assertThat(getBesuErrorLogs()).isEmpty()
val localStackL1ContractAddress = "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9"
val logsSearcher = Web3JLogsSearcher(
vertx = vertx,
web3jClient = Web3jClientManager.buildL1Client(
log = LogManager.getLogger("test.clients.l1.events-fetcher"),
requestResponseLogLevel = Level.TRACE,
failuresLogLevel = Level.WARN
),
Web3JLogsSearcher.Config(
backoffDelay = 1.milliseconds,
requestRetryConfig = RetryConfig.noRetries
),
log = LogManager.getLogger("test.clients.l1.events-fetcher")
)
val web3jElClient = createWeb3jHttpClient(executionLayerUrl)
log.info("starting test flow: besu staterecovery block={}", web3jElClient.ethBlockNumber().send().blockNumber)
// generate some activity on L2 for coordinator to finalize on L1
val keepSendingTxToL2 = AtomicBoolean(true)
sendTxToL2(keepSendingTxToL2::get)
// A
// await for 3 finalizations to happen on L1
val lastFinalizationA = run {
var finalizationLogs: List<EthLogEvent<DataFinalizedV3>> = emptyList()
await()
.atMost(2.minutes.toJavaDuration())
.untilAsserted {
finalizationLogs = getFinalizationsOnL1(logsSearcher, localStackL1ContractAddress)
assertThat(finalizationLogs.size).isGreaterThan(2)
}
finalizationLogs.last()
}
log.info("lastFinalizationA={}", lastFinalizationA.event.intervalString())
// await for coordinator to finalize on L1 at least once
val stateRecoveryStartBlockNumber = lastFinalizationA.event.startBlockNumber - 2UL
log.info("restarting Besu+Shomei for recovery of state pushed to L1 by coordinator")
execCommandAndAssertSuccess(
command = "make staterecovery-replay-from-block " +
"L1_ROLLUP_CONTRACT_ADDRESS=$localStackL1ContractAddress " +
"PLUGIN_STATERECOVERY_OVERRIDE_START_BLOCK_NUMBER=$stateRecoveryStartBlockNumber",
log = log
).get()
// No Errors should be logged in Besu
assertThat(getBesuErrorLogs()).isEmpty()
// wait for Besu to be up and running
waitExecutionLayerToBeUpAndRunning(executionLayerUrl, log = log)
// assert besu and shomei could sync through P2P network
assertBesuAndShomeiStateRootMatches(
web3jElClient,
stateManagerClient,
lastFinalizationA.event.endBlockNumber,
lastFinalizationA.event.finalStateRootHash
)
// No Errors should be logged in Besu
assertThat(getBesuErrorLogs()).isEmpty()
// B
// await for coordinator to finalize on L1 again
await()
.atMost(2.minutes.toJavaDuration())
.untilAsserted {
getLastFinalizationOnL1(logsSearcher, localStackL1ContractAddress)
.also { finalization ->
assertThat(finalization.event.startBlockNumber)
.isGreaterThan(lastFinalizationA.event.endBlockNumber)
}
}
val lastFinalizationB = getLastFinalizationOnL1(logsSearcher, localStackL1ContractAddress)
log.info("lastFinalizationB={}", lastFinalizationB.event.intervalString())
assertBesuAndShomeiStateRootMatches(
web3jElClient,
stateManagerClient,
lastFinalizationB.event.endBlockNumber,
lastFinalizationB.event.finalStateRootHash
)
// No Errors should be logged in Besu
assertThat(getBesuErrorLogs()).isEmpty()
// C.
// restart besu node with non-graceful shutdown
log.info("Restarting zkbesu-shomei node")
execCommandAndAssertSuccess(
command = "docker restart -s 9 zkbesu-shomei-sr",
log = log
).get()
// No Errors should be logged in Besu
assertThat(getBesuErrorLogs()).isEmpty()
waitExecutionLayerToBeUpAndRunning(executionLayerUrl, log = log)
val lastRecoveredBlock = web3jElClient.ethBlockNumber().send().blockNumber.toULong()
// await coordinator to finalize on L1, beyond what's already in sync
await()
.atMost(2.minutes.toJavaDuration())
.untilAsserted {
getLastFinalizationOnL1(logsSearcher, localStackL1ContractAddress)
.also { lastFinalizationC ->
assertThat(lastFinalizationC.event.startBlockNumber)
.isGreaterThan(lastFinalizationB.event.endBlockNumber)
assertThat(lastFinalizationC.event.endBlockNumber).isGreaterThan(lastRecoveredBlock)
}
}
keepSendingTxToL2.set(false)
val lastFinalizationC = getLastFinalizationOnL1(logsSearcher, localStackL1ContractAddress)
log.info("lastFinalizationC={}", lastFinalizationC.event.intervalString())
assertBesuAndShomeiStateRootMatches(
web3jElClient,
stateManagerClient,
lastFinalizationC.event.endBlockNumber,
lastFinalizationC.event.finalStateRootHash
)
// No Errors should be logged in Besu
assertThat(getBesuErrorLogs()).isEmpty()
}
private fun sendTxToL2(
keepSendingPredicate: () -> Boolean
) {
val account = L2AccountManager.generateAccount()
val txManager = L2AccountManager.getTransactionManager(account)
Thread {
while (keepSendingPredicate()) {
val txHash = txManager.sendTransaction(
/*gasPrice*/ 150UL.gwei.toBigInteger(),
/*gasLimit*/ 25_000UL.toBigInteger(),
/*to*/ account.address,
/*data*/ "",
/*value*/ 1UL.toBigInteger()
).transactionHash
log.trace("sent tx to L2, txHash={}", txHash)
Web3jClientManager.l2Client.waitForTxReceipt(
txHash = txHash,
timeout = 5.seconds,
pollingInterval = 500.milliseconds
)
}
}.start()
}
private fun getBesuErrorLogs(): List<String> {
val tmpLogDir = getPathTo("tmp/local").resolve("test-logs")
if (!Files.exists(tmpLogDir)) {
Files.createDirectory(tmpLogDir)
}
val tmpFile = tmpLogDir.resolve("zkbesu-shomei-sr-e2e-test.logs")
// We need this workaround because the Java native implementation hangs if STDOUT is too long
Runner
.executeCommandFailOnNonZeroExitCode("docker logs zkbesu-shomei-sr > ${tmpFile.toAbsolutePath()}")
.get()
val errorLogs = Files.readAllLines(tmpFile).filter { it.contains("ERROR", ignoreCase = true) }
Files.delete(tmpFile)
return errorLogs
}
}

View File

@@ -44,7 +44,7 @@ import kotlin.time.toJavaDuration
@ExtendWith(VertxExtension::class)
class StateRecoveryAppWithFakeExecutionClientIntTest {
private val log = LogManager.getLogger("test.case.StateRecoverAppWithFakeExecutionClientIntTest")
private lateinit var stateRecoverApp: StateRecoverApp
private lateinit var stateRecoverApp: StateRecoveryApp
private lateinit var aggregationsAndBlobs: List<AggregationAndBlobs>
private lateinit var executionLayerClient: FakeExecutionLayerClient
private lateinit var fakeStateManagerClient: FakeStateManagerClient
@@ -129,7 +129,7 @@ class StateRecoveryAppWithFakeExecutionClientIntTest {
logger = LogManager.getLogger("test.clients.l1.blobscan")
)
stateRecoverApp = StateRecoverApp(
stateRecoverApp = StateRecoveryApp(
vertx = vertx,
elClient = executionLayerClient,
blobFetcher = blobScanClient,
@@ -138,7 +138,7 @@ class StateRecoveryAppWithFakeExecutionClientIntTest {
transactionDetailsClient = transactionDetailsClient,
blockHeaderStaticFields = BlockHeaderStaticFields.localDev,
lineaContractClient = lineaContractClient,
config = StateRecoverApp.Config(
config = StateRecoveryApp.Config(
l1LatestSearchBlock = BlockParameter.Tag.LATEST,
l1PollingInterval = 10.milliseconds,
executionClientPollingInterval = 1.seconds,

View File

@@ -3,21 +3,21 @@ package linea.staterecovery
import build.linea.clients.StateManagerClientV1
import build.linea.clients.StateManagerV1JsonRpcClient
import build.linea.contract.l1.LineaContractVersion
import build.linea.domain.BlockInterval
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import io.vertx.core.Vertx
import io.vertx.junit5.VertxExtension
import linea.log4j.configureLoggers
import linea.testing.Runner
import linea.staterecovery.test.assertBesuAndShomeiStateRootMatches
import linea.staterecovery.test.execCommandAndAssertSuccess
import linea.staterecovery.test.waitExecutionLayerToBeUpAndRunning
import linea.web3j.createWeb3jHttpClient
import net.consensys.linea.BlockParameter
import net.consensys.linea.jsonrpc.client.RequestRetryConfig
import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory
import net.consensys.linea.metrics.micrometer.MicrometerMetricsFacade
import net.consensys.linea.testing.submission.AggregationAndBlobs
import net.consensys.linea.testing.submission.loadBlobsAndAggregationsSortedAndGrouped
import net.consensys.linea.testing.submission.submitBlobsAndAggregationsAndWaitExecution
import net.consensys.toULong
import net.consensys.zkevm.coordinator.clients.smartcontract.LineaRollupSmartContractClient
import net.consensys.zkevm.ethereum.ContractsManager
import net.consensys.zkevm.ethereum.LineaRollupDeploymentResult
import net.consensys.zkevm.ethereum.MakeFileDelegatedContractsManager.connectToLineaRollupContract
@@ -25,24 +25,29 @@ import net.consensys.zkevm.ethereum.MakeFileDelegatedContractsManager.lineaRollu
import net.consensys.zkevm.ethereum.Web3jClientManager
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.web3j.protocol.Web3j
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.net.URI
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
@ExtendWith(VertxExtension::class)
class StateRecoveryWithRealBesuAndStateManagerIntTest {
private val log = LogManager.getLogger("test.case.StateRecoverAppWithLocalStackIntTest")
private lateinit var stateManagerClient: StateManagerClientV1
private val testDataDir = "testdata/coordinator/prover/v3"
private val aggregationsAndBlobs: List<AggregationAndBlobs> = loadBlobsAndAggregationsSortedAndGrouped(
blobsResponsesDir = "$testDataDir/compression/responses",
aggregationsResponsesDir = "$testDataDir/aggregation/responses"
)
private lateinit var rollupDeploymentResult: LineaRollupDeploymentResult
private lateinit var contractClientForBlobSubmission: LineaRollupSmartContractClient
private lateinit var contractClientForAggregationSubmission: LineaRollupSmartContractClient
private val executionLayerUrl = "http://localhost:9145"
private val stateManagerUrl = "http://localhost:8890"
@@ -64,14 +69,10 @@ class StateRecoveryWithRealBesuAndStateManagerIntTest {
zkStateManagerVersion = "2.3.0",
logger = LogManager.getLogger("test.clients.l1.state-manager")
)
}
private lateinit var rollupDeploymentResult: LineaRollupDeploymentResult
@Test
fun setupDeployContractForL2L1StateReplay() {
configureLoggers(
rootLevel = Level.INFO,
log.name to Level.DEBUG,
"net.consensys.linea.contract.Web3JContractAsyncHelper" to Level.WARN,
"test.clients.l1.executionlayer" to Level.INFO,
"test.clients.l1.web3j-default" to Level.INFO,
@@ -82,72 +83,66 @@ class StateRecoveryWithRealBesuAndStateManagerIntTest {
"test.clients.l1.blobscan" to Level.INFO,
"net.consensys.linea.contract.l1" to Level.INFO
)
val aggregationsAndBlobs: List<AggregationAndBlobs> = loadBlobsAndAggregationsSortedAndGrouped(
blobsResponsesDir = "$testDataDir/compression/responses",
aggregationsResponsesDir = "$testDataDir/aggregation/responses"
)
}
@Test
@Order(1)
fun `should recover status from genesis - seed data replay`() {
this.rollupDeploymentResult = ContractsManager.get()
.deployLineaRollup(numberOfOperators = 2, contractVersion = LineaContractVersion.V6).get()
log.info("""LineaRollup address=${rollupDeploymentResult.contractAddress}""")
.deployLineaRollup(numberOfOperators = 2, contractVersion = LineaContractVersion.V6)
.get()
log.info("LineaRollup address={}", rollupDeploymentResult.contractAddress)
contractClientForBlobSubmission = rollupDeploymentResult.rollupOperatorClient
contractClientForAggregationSubmission = connectToLineaRollupContract(
rollupDeploymentResult.contractAddress,
// index 0 is the first operator in rollupOperatorClient
rollupDeploymentResult.rollupOperators[1].txManager,
smartContractErrors = lineaRollupContractErrors
)
log.info("starting stack for recovery of state pushed to L1")
val staterecoveryNodesStartFuture = SafeFuture.supplyAsync {
Runner.executeCommand(
"make staterecovery-replay-from-genesis L1_ROLLUP_CONTRACT_ADDRESS=${rollupDeploymentResult.contractAddress}"
)
}
val staterecoveryNodesStartFuture = execCommandAndAssertSuccess(
"make staterecovery-replay-from-block " +
"L1_ROLLUP_CONTRACT_ADDRESS=${rollupDeploymentResult.contractAddress} " +
"PLUGIN_STATERECOVERY_OVERRIDE_START_BLOCK_NUMBER=1",
log = log
)
val lastAggregationAndBlobs = aggregationsAndBlobs.findLast { it.aggregation != null }!!
val lastAggregation = lastAggregationAndBlobs.aggregation!!
val blobsSubmissionFuture = SafeFuture.supplyAsync {
submitBlobsAndAggregationsAndWaitExecution(
contractClientForBlobSubmission = rollupDeploymentResult.rollupOperatorClient,
contractClientForAggregationSubmission = connectToLineaRollupContract(
rollupDeploymentResult.contractAddress,
// index 0 is the first operator in rollupOperatorClient
rollupDeploymentResult.rollupOperators[1].txManager,
smartContractErrors = lineaRollupContractErrors
),
contractClientForBlobSubmission = contractClientForBlobSubmission,
contractClientForAggregationSubmission = contractClientForAggregationSubmission,
aggregationsAndBlobs = aggregationsAndBlobs,
blobChunksSize = 6,
l1Web3jClient = Web3jClientManager.l1Client,
waitTimeout = 4.minutes
)
log.info("finalization={} executed on l1", lastAggregation.intervalString())
}
SafeFuture.allOf(staterecoveryNodesStartFuture, blobsSubmissionFuture).get()
val web3jElClient = createWeb3jHttpClient(executionLayerUrl)
// wait for state-manager to be up and running
await()
.pollInterval(1.seconds.toJavaDuration())
.atMost(5.minutes.toJavaDuration())
.untilAsserted {
kotlin.runCatching {
assertThat(web3jElClient.ethBlockNumber().send().blockNumber.toLong()).isGreaterThanOrEqualTo(0L)
}.getOrElse {
log.info("waiting for Besu to start, trying to connect to $executionLayerUrl")
throw AssertionError("could not connect to $executionLayerUrl", it)
}
}
// wait for Besu to be up and running
waitExecutionLayerToBeUpAndRunning(executionLayerUrl, log = log)
val lastAggregationAndBlobs = aggregationsAndBlobs.findLast { it.aggregation != null }!!
val lastAggregation = lastAggregationAndBlobs.aggregation!!
await()
.untilAsserted {
assertThat(
rollupDeploymentResult.rollupOperatorClient
.finalizedL2BlockNumber(blockParameter = BlockParameter.Tag.LATEST).get()
).isGreaterThanOrEqualTo(lastAggregation.endBlockNumber)
}
log.info("finalization={} executed on l1", lastAggregation.intervalString())
assertBesuAndShomeiStateRootMatches(web3jElClient, stateManagerClient, lastAggregationAndBlobs)
}
val expectedZkEndStateRootHash = lastAggregationAndBlobs.blobs.last().blobCompressionProof!!.finalStateRootHash
await()
.atMost(5.minutes.toJavaDuration())
.untilAsserted {
assertThat(web3jElClient.ethBlockNumber().send().blockNumber.toULong())
.isGreaterThanOrEqualTo(lastAggregation.endBlockNumber)
val blockInterval = BlockInterval(lastAggregation.endBlockNumber, lastAggregation.endBlockNumber)
assertThat(stateManagerClient.rollupGetStateMerkleProof(blockInterval).get().zkEndStateRootHash)
.isEqualTo(expectedZkEndStateRootHash)
}
private fun assertBesuAndShomeiStateRootMatches(
web3jElClient: Web3j,
stateManagerClient: StateManagerClientV1,
targetAggregationAndBlobs: AggregationAndBlobs
) {
val targetAggregation = targetAggregationAndBlobs.aggregation!!
val expectedZkEndStateRootHash = targetAggregationAndBlobs.blobs.last().blobCompressionProof!!.finalStateRootHash
assertBesuAndShomeiStateRootMatches(
web3jElClient,
stateManagerClient,
targetAggregation.endBlockNumber,
expectedZkEndStateRootHash
)
}
}

View File

@@ -35,7 +35,7 @@ import kotlin.time.toJavaDuration
@ExtendWith(VertxExtension::class)
class StateRecoverSepoliaWithFakeExecutionClientIntTest {
private val log = LogManager.getLogger("test.case.StateRecoverSepoliaWithFakeExecutionClientIntTest")
private lateinit var stateRecoverApp: StateRecoverApp
private lateinit var stateRecoverApp: StateRecoveryApp
private lateinit var logsSearcher: EthLogsSearcher
private lateinit var executionLayerClient: FakeExecutionLayerClient
private lateinit var blobFetcher: BlobFetcher
@@ -91,7 +91,7 @@ class StateRecoverSepoliaWithFakeExecutionClientIntTest {
fakeStateManagerClient = FakeStateManagerClientReadFromL1(
headBlockNumber = ULong.MAX_VALUE,
logsSearcher = logsSearcher,
contractAddress = StateRecoverApp.Config.lineaSepolia.smartContractAddress
contractAddress = StateRecoveryApp.Config.lineaSepolia.smartContractAddress
)
transactionDetailsClient = VertxTransactionDetailsClient.create(
jsonRpcClientFactory = jsonRpcFactory,
@@ -109,10 +109,10 @@ class StateRecoverSepoliaWithFakeExecutionClientIntTest {
requestResponseLogLevel = Level.INFO,
failuresLogLevel = Level.DEBUG
),
contractAddress = StateRecoverApp.Config.lineaSepolia.smartContractAddress
contractAddress = StateRecoveryApp.Config.lineaSepolia.smartContractAddress
)
stateRecoverApp = StateRecoverApp(
stateRecoverApp = StateRecoveryApp(
vertx = vertx,
elClient = executionLayerClient,
blobFetcher = blobFetcher,
@@ -121,7 +121,7 @@ class StateRecoverSepoliaWithFakeExecutionClientIntTest {
transactionDetailsClient = transactionDetailsClient,
blockHeaderStaticFields = BlockHeaderStaticFields.localDev,
lineaContractClient = lineaContractClient,
config = StateRecoverApp.Config(
config = StateRecoveryApp.Config(
l1LatestSearchBlock = BlockParameter.Tag.LATEST,
l1PollingInterval = 5.seconds,
executionClientPollingInterval = 1.seconds,

View File

@@ -0,0 +1,67 @@
package linea.staterecovery.test
import build.linea.clients.StateManagerClientV1
import build.linea.domain.BlockInterval
import linea.testing.CommandResult
import linea.testing.Runner
import linea.web3j.createWeb3jHttpClient
import net.consensys.toULong
import org.apache.logging.log4j.Logger
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.Awaitility.await
import org.web3j.protocol.Web3j
import tech.pegasys.teku.infrastructure.async.SafeFuture
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
fun execCommandAndAssertSuccess(
command: String,
log: Logger
): SafeFuture<CommandResult> {
return Runner
.executeCommandFailOnNonZeroExitCode(command, log = log)
.thenPeek { execResult ->
log.debug("STDOUT: {}", execResult.stdOutStr)
log.debug("STDERR: {}", execResult.stdErrStr)
assertThat(execResult.isSuccess).isTrue()
}
}
fun assertBesuAndShomeiStateRootMatches(
web3jElClient: Web3j,
stateManagerClient: StateManagerClientV1,
expectedBlockNumber: ULong,
expectedZkEndStateRootHash: ByteArray
) {
await()
.pollInterval(1.seconds.toJavaDuration())
.atMost(5.minutes.toJavaDuration())
.untilAsserted {
assertThat(web3jElClient.ethBlockNumber().send().blockNumber.toULong())
.isGreaterThanOrEqualTo(expectedBlockNumber)
val blockInterval = BlockInterval(expectedBlockNumber, expectedBlockNumber)
assertThat(stateManagerClient.rollupGetStateMerkleProof(blockInterval).get().zkEndStateRootHash)
.isEqualTo(expectedZkEndStateRootHash)
}
}
fun waitExecutionLayerToBeUpAndRunning(
executionLayerUrl: String,
expectedHeadBlockNumber: ULong = 0UL,
log: Logger
) {
val web3jElClient = createWeb3jHttpClient(executionLayerUrl)
await()
.pollInterval(1.seconds.toJavaDuration())
.atMost(5.minutes.toJavaDuration())
.untilAsserted {
runCatching {
assertThat(web3jElClient.ethBlockNumber().send().blockNumber.toULong())
.isGreaterThanOrEqualTo(expectedHeadBlockNumber)
}.getOrElse {
log.info("waiting for Besu to start, trying to connect to $executionLayerUrl")
throw AssertionError("could not connect to $executionLayerUrl", it)
}
}
}

View File

@@ -0,0 +1,27 @@
package linea.staterecovery.test
import build.linea.domain.EthLogEvent
import linea.staterecovery.DataFinalizedV3
import linea.web3j.Web3JLogsSearcher
import net.consensys.linea.BlockParameter
fun getLastFinalizationOnL1(
logsSearcher: Web3JLogsSearcher,
contractAddress: String
): EthLogEvent<DataFinalizedV3> {
return getFinalizationsOnL1(logsSearcher, contractAddress)
.lastOrNull()
?: error("no finalization found")
}
fun getFinalizationsOnL1(
logsSearcher: Web3JLogsSearcher,
contractAddress: String
): List<EthLogEvent<DataFinalizedV3>> {
return logsSearcher.getLogs(
fromBlock = BlockParameter.Tag.EARLIEST,
toBlock = BlockParameter.Tag.LATEST,
address = contractAddress,
topics = listOf(DataFinalizedV3.topic)
).get().map(DataFinalizedV3.Companion::fromEthLog)
}