Staterecovery adapt to new besu (#642)

* staterecovery: adapt to Besu 25.6-delivery46 and support forced stop sync for debug purposes
This commit is contained in:
Pedro Novais
2025-02-06 10:28:03 +00:00
committed by GitHub
parent ac95bbd3ae
commit 4d85082d33
28 changed files with 719 additions and 187 deletions

View File

@@ -23,8 +23,8 @@ clean-testnet-folders:
rm -rf tmp/testnet/* || true # ignore failure if folders do not exist already
clean-environment:
docker compose -f docker/compose-tracing-v1-ci-extension.yml -f docker/compose-tracing-v2-ci-extension.yml --profile l1 --profile l2 --profile debug --profile staterecovery kill -s 9 || true;
docker compose -f docker/compose-tracing-v1-ci-extension.yml -f docker/compose-tracing-v2-ci-extension.yml --profile l1 --profile l2 --profile debug --profile staterecovery down || true;
docker compose -f docker/compose-tracing-v1-ci-extension.yml -f docker/compose-tracing-v2-ci-extension.yml -f docker/compose-tracing-v2-staterecovery-extension.yml --profile l1 --profile l2 --profile debug --profile staterecovery kill -s 9 || true;
docker compose -f docker/compose-tracing-v1-ci-extension.yml -f docker/compose-tracing-v2-ci-extension.yml -f docker/compose-tracing-v2-staterecovery-extension.yml --profile l1 --profile l2 --profile debug --profile staterecovery down || true;
make clean-local-folders;
docker volume rm linea-local-dev linea-logs || true; # ignore failure if volumes do not exist already
docker system prune -f || true;
@@ -81,7 +81,7 @@ start-env-with-tracing-v2-ci:
start-env-with-staterecovery: COMPOSE_PROFILES:=l1,l2,staterecovery
start-env-with-staterecovery: L1_CONTRACT_VERSION:=6
start-env-with-staterecovery:
make start-env COMPOSE_FILE=docker/compose-tracing-v2-staterecovery-extension.yml LINEA_PROTOCOL_CONTRACTS_ONLY=true L1_CONTRACT_VERSION=$(L1_CONTRACT_VERSION)
make start-env COMPOSE_FILE=docker/compose-tracing-v2-staterecovery-extension.yml LINEA_PROTOCOL_CONTRACTS_ONLY=true L1_CONTRACT_VERSION=$(L1_CONTRACT_VERSION) COMPOSE_PROFILES=$(COMPOSE_PROFILES)
staterecovery-replay-from-block: L1_ROLLUP_CONTRACT_ADDRESS:=0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9
staterecovery-replay-from-block: STATERECOVERY_OVERRIDE_START_BLOCK_NUMBER:=1

View File

@@ -112,7 +112,7 @@
<DebouncingFilter/>
<appender-ref ref="rewrite"/>
</Logger>
<Logger name="clients.l1" level="INFO" additivity="false">
<Logger name="clients.l2" level="DEBUG" additivity="false">
<DebouncingFilter/>
<appender-ref ref="console"/>
</Logger>

View File

@@ -4,12 +4,12 @@ import io.micrometer.core.instrument.MeterRegistry
import io.vertx.core.Vertx
import io.vertx.micrometer.backends.BackendRegistries
import io.vertx.sqlclient.SqlClient
import linea.web3j.createWeb3jHttpClient
import net.consensys.linea.async.toSafeFuture
import net.consensys.linea.jsonrpc.client.LoadBalancingJsonRpcClient
import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory
import net.consensys.linea.metrics.micrometer.MicrometerMetricsFacade
import net.consensys.linea.vertx.loadVertxConfig
import net.consensys.linea.web3j.okHttpClientBuilder
import net.consensys.zkevm.coordinator.api.Api
import net.consensys.zkevm.coordinator.app.config.CoordinatorConfig
import net.consensys.zkevm.coordinator.app.config.DatabaseConfig
@@ -31,9 +31,8 @@ import org.apache.logging.log4j.Level
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.web3j.protocol.Web3j
import org.web3j.protocol.http.HttpService
import org.web3j.utils.Async
import tech.pegasys.teku.infrastructure.async.SafeFuture
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toKotlinDuration
class CoordinatorApp(private val configs: CoordinatorConfig) {
@@ -60,15 +59,13 @@ class CoordinatorApp(private val configs: CoordinatorConfig) {
),
vertx
)
private val l2Web3jClient: Web3j =
Web3j.build(
HttpService(
configs.l2.rpcEndpoint.toString(),
okHttpClientBuilder(LogManager.getLogger("clients.l2")).build()
),
1000,
Async.defaultExecutorService()
)
private val l2Web3jClient: Web3j = createWeb3jHttpClient(
rpcUrl = configs.zkTraces.ethApi.toString(),
log = LogManager.getLogger("clients.l2.eth-api.rpc-node"),
pollingInterval = 1.seconds,
requestResponseLogLevel = Level.TRACE,
failuresLogLevel = Level.DEBUG
)
private val persistenceRetryer = PersistenceRetryer(
vertx = vertx,

View File

@@ -8,6 +8,7 @@ import build.linea.web3j.Web3JLogsClient
import io.vertx.core.Vertx
import kotlinx.datetime.Clock
import linea.encoding.BlockRLPEncoder
import linea.web3j.createWeb3jHttpClient
import net.consensys.linea.BlockNumberAndHash
import net.consensys.linea.blob.ShnarfCalculatorVersion
import net.consensys.linea.contract.Web3JL2MessageService
@@ -37,7 +38,6 @@ import net.consensys.linea.traces.TracesCountersV2
import net.consensys.linea.web3j.ExtendedWeb3JImpl
import net.consensys.linea.web3j.SmartContractErrors
import net.consensys.linea.web3j.Web3jBlobExtended
import net.consensys.linea.web3j.okHttpClientBuilder
import net.consensys.zkevm.LongRunningService
import net.consensys.zkevm.coordinator.app.config.CoordinatorConfig
import net.consensys.zkevm.coordinator.blockcreation.BatchesRepoBasedLastProvenBlockNumberProvider
@@ -106,7 +106,6 @@ import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.web3j.protocol.Web3j
import org.web3j.protocol.http.HttpService
import org.web3j.utils.Async
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.util.concurrent.CompletableFuture
import java.util.function.Consumer
@@ -148,21 +147,20 @@ class L1DependentApp(
l2Web3jClient,
smartContractErrors
)
private val l1Web3jClient = Web3j.build(
HttpService(
configs.l1.rpcEndpoint.toString(),
okHttpClientBuilder(LogManager.getLogger("clients.l1")).build()
),
1000,
Async.defaultExecutorService()
private val l1Web3jClient = createWeb3jHttpClient(
rpcUrl = configs.l1.rpcEndpoint.toString(),
log = LogManager.getLogger("clients.l1.eth-api"),
pollingInterval = 1.seconds
)
private val l1Web3jService = Web3jBlobExtended(HttpService(configs.l1.ethFeeHistoryEndpoint.toString()))
private val l2ZkTracesWeb3jClient: Web3j =
Web3j.build(
HttpService(configs.zkTraces.ethApi.toString()),
1000,
Async.defaultExecutorService()
)
private val l2ZkTracesWeb3jClient: Web3j = createWeb3jHttpClient(
rpcUrl = configs.zkTraces.ethApi.toString(),
log = LogManager.getLogger("clients.l2.eth-api.tracer-node"),
pollingInterval = 1.seconds,
requestResponseLogLevel = org.apache.logging.log4j.Level.TRACE,
failuresLogLevel = org.apache.logging.log4j.Level.DEBUG
)
private val l1ChainId = l1Web3jClient.ethChainId().send().chainId.toLong()

View File

@@ -585,7 +585,7 @@ services:
ipv4_address: 10.10.10.205
zkbesu-shomei-sr:
image: consensys/linea-besu-package:${BESU_PACKAGE_TAG:-devnet-fc27c01}
image: consensys/linea-besu-package:${BESU_PACKAGE_TAG:-devnet-7dfb8e3}
hostname: zkbesu-shomei-sr
container_name: zkbesu-shomei-sr
profiles: [ "external-to-monorepo", "staterecovery" ]

View File

@@ -22,3 +22,10 @@ class KMath {
fun ULong.plusExact(other: ULong): ULong = KMath.addExact(this, other)
fun UInt.plusExact(other: UInt): UInt = KMath.addExact(this, other)
fun ULong.multiplyExact(other: ULong): ULong {
if (this != 0UL && other != 0UL && ULong.MAX_VALUE / this < other) {
throw ArithmeticException("ULong overflow")
}
return this * other
}

View File

@@ -6,6 +6,7 @@ import java.math.MathContext
import java.math.RoundingMode
const val OneGWei = 1_000_000_000L
const val OneEth = 1_000_000_000_000_000_000L
val OneGWeiBigDecimal: BigDecimal = BigDecimal.valueOf(OneGWei)
const val OneKWei = 1_000L
@@ -56,7 +57,8 @@ fun ULong.toHexStringUInt256(): String = this.toHexStringPaddedToBitSize(256)
fun ULong.toKWeiUInt(): UInt = this.toDouble().tokWeiUInt()
inline val ULong.gwei: ULong get() = this * OneGWei.toULong()
inline val ULong.gwei: ULong get() = this.multiplyExact(OneGWei.toULong())
inline val ULong.eth: ULong get() = this.multiplyExact(OneEth.toULong())
fun ULong.toGWei(): Double = this.toDouble().toGWei()

View File

@@ -24,4 +24,14 @@ class MathExtensionsTest {
.isInstanceOf(ArithmeticException::class.java)
.withFailMessage("ULong overflow")
}
@Test
fun `ULong MultiplyExact`() {
assertThat(2UL.multiplyExact(3UL)).isEqualTo(6UL)
assertThat(0UL.multiplyExact(123456789UL)).isEqualTo(0UL)
assertThat(ULong.MAX_VALUE.multiplyExact(1UL)).isEqualTo(ULong.MAX_VALUE)
assertThatThrownBy { ULong.MAX_VALUE.multiplyExact(2UL) }
.isInstanceOf(ArithmeticException::class.java)
.hasMessage("ULong overflow")
}
}

View File

@@ -6,7 +6,7 @@ plugins {
//def besuArtifactGroup="org.hyperledger.besu"
//def besuVersion=libs.versions.besu.get()
def besuArtifactGroup="io.consensys.linea-besu"
def besuVersion="25.1-delivery43"
def besuVersion="25.2-delivery46"
dependencies {
api("${besuArtifactGroup}:besu-datatypes:${besuVersion}") {

View File

@@ -6,6 +6,7 @@ import net.consensys.toBigInteger
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.bytes.Bytes32
import org.hyperledger.besu.crypto.SECP256K1
import org.hyperledger.besu.crypto.SECPSignature
import org.hyperledger.besu.datatypes.AccessListEntry
import org.hyperledger.besu.datatypes.Address
import org.hyperledger.besu.datatypes.Hash
@@ -25,6 +26,14 @@ object MapperLineaDomainToBesu {
private val secp256k1 = SECP256K1()
private val blockHeaderFunctions = MainnetBlockHeaderFunctions()
internal fun signature(tx: linea.domain.Transaction, recId: Byte): SECPSignature {
return secp256k1.createSignature(
tx.r,
tx.s,
recId
)
}
fun recIdFromV(v: BigInteger): Pair<Byte, BigInteger?> {
val recId: Byte
var chainId: BigInteger? = null
@@ -41,9 +50,9 @@ object MapperLineaDomainToBesu {
fun getRecIdAndChainId(tx: linea.domain.Transaction): Pair<Byte, BigInteger?> {
if (tx.type == TransactionType.FRONTIER) {
return recIdFromV(tx.v.toBigInteger())
return recIdFromV(tx.v!!.toBigInteger())
} else {
return tx.v.toByte() to tx.chainId?.toBigInteger()
return (tx.yParity ?: tx.v)!!.toByte() to tx.chainId?.toBigInteger()
}
}
@@ -86,7 +95,7 @@ object MapperLineaDomainToBesu {
if (th.message?.startsWith("Error mapping transaction to Besu") ?: false) {
throw th
} else {
throw RuntimeException("Error mapping block to Besu: block=${block.number}", th)
throw RuntimeException("Error mapping block=${block.number} to Besu: ${th.message}", th)
}
}
}
@@ -103,16 +112,12 @@ object MapperLineaDomainToBesu {
fun mapToBesu(tx: linea.domain.Transaction): Transaction {
val (recId, recChainId) = getRecIdAndChainId(tx)
val signature = secp256k1.createSignature(
tx.r,
tx.s,
recId
)
val signature = signature(tx, recId)
val besuType = tx.type.toBesu()
val chainId = tx.chainId?.toBigInteger() ?: recChainId
return Transaction.builder()
.type(tx.type.toBesu())
.type(besuType)
.nonce(tx.nonce.toLong())
.apply { tx.gasPrice?.let { gasPrice(it.toWei()) } }
.gasLimit(tx.gasLimit.toLong())

View File

@@ -12,6 +12,8 @@ dependencies {
testImplementation project(":jvm-libs:linea:blob-compressor")
testImplementation(testFixtures(project(":jvm-libs:linea:blob-compressor")))
testImplementation(testFixtures(project(":jvm-libs:linea:core:domain-models")))
testImplementation(project(":jvm-libs:linea:besu-rlp-and-mappers"))
testImplementation(project(":jvm-libs:linea:testing:file-system"))
testImplementation("io.tmio:tuweni-bytes:${libs.versions.tuweni.get()}")
testImplementation(project(":jvm-libs:linea:besu-libs"))

View File

@@ -1,42 +1,144 @@
package net.consensys.linea.blob
import kotlinx.datetime.Instant
import linea.blob.BlobCompressor
import linea.blob.GoBackedBlobCompressor
import linea.domain.AccessListEntry
import linea.domain.TransactionFactory
import linea.domain.createBlock
import linea.domain.toBesu
import linea.rlp.BesuRlpBlobDecoder
import linea.rlp.RLP
import net.consensys.decodeHex
import net.consensys.eth
import net.consensys.linea.nativecompressor.CompressorTestData
import net.consensys.toBigInteger
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import kotlin.jvm.optionals.getOrNull
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class GoNativeBlobDecompressorTest {
private val blobCompressedLimit = 30 * 1024
private lateinit var compressor: GoNativeBlobCompressor
private lateinit var decompressor: BlobDecompressor
private val compressor: BlobCompressor = GoBackedBlobCompressor
.getInstance(BlobCompressorVersion.V1_0_1, blobCompressedLimit.toUInt())
private val decompressor: BlobDecompressor =
GoNativeBlobDecompressorFactory.getInstance(BlobDecompressorVersion.V1_1_0)
@BeforeEach
fun beforeEach() {
compressor = GoNativeBlobCompressorFactory
.getInstance(BlobCompressorVersion.V1_0_1)
.apply {
Init(
dataLimit = blobCompressedLimit,
dictPath = GoNativeBlobCompressorFactory.dictionaryPath.toAbsolutePath().toString()
)
Reset()
}
decompressor = GoNativeBlobDecompressorFactory.getInstance(BlobDecompressorVersion.V1_1_0)
compressor.reset()
}
@Test
fun `when blocks are compressed with compressor shall decompress them back`() {
val blocks = CompressorTestData.blocksRlpEncoded
assertTrue(compressor.Write(blocks[0], blocks[0].size))
assertTrue(compressor.Write(blocks[1], blocks[1].size))
compressor.appendBlock(blocks[0])
compressor.appendBlock(blocks[1])
val compressedData = ByteArray(compressor.Len())
compressor.Bytes(compressedData)
val compressedData = compressor.getCompressedData()
val decompressedBlob = decompressor.decompress(compressedData)
assertThat(decompressedBlob.size).isGreaterThan(compressedData.size)
val decompressedBlocks: List<ByteArray> = rlpDecodeAsListOfBytes(decompressedBlob)
assertThat(decompressedBlocks).hasSize(2)
}
@Test
fun `should decompress original data`() {
val tx0 = TransactionFactory.createTransactionFrontier(
nonce = 10uL,
gasLimit = 22_0000uL,
to = null,
value = 1uL.eth.toBigInteger(),
input = byteArrayOf()
)
val tx1 = TransactionFactory.createTransactionEip1559(
nonce = 123uL,
gasLimit = 23_0000uL,
to = null,
value = 2uL.eth.toBigInteger(),
input = "0x1234".toByteArray(),
accessList = listOf(
AccessListEntry(
address = "0x0000000000000000000000000000000000000001".decodeHex(),
storageKeys = listOf(
"0x0000000000000000000000000000000000000000000000000000000000000001".decodeHex(),
"0x0000000000000000000000000000000000000000000000000000000000000002".decodeHex()
)
),
AccessListEntry(
address = "0x0000000000000000000000000000000000000002".decodeHex(),
storageKeys = listOf(
"0x0000000000000000000000000000000000000000000000000000000000000011".decodeHex(),
"0x0000000000000000000000000000000000000000000000000000000000000012".decodeHex()
)
)
)
)
val originalBesuBlock = createBlock(
number = 123uL,
timestamp = Instant.parse("2025-01-02T12:23:45Z"),
transactions = listOf(tx0, tx1)
).toBesu()
compressor.appendBlock(RLP.encodeBlock(originalBesuBlock))
val decompressedData = decompressor.decompress(compressor.getCompressedData())
val decompressedBlocks: List<ByteArray> = rlpDecodeAsListOfBytes(decompressedData)
assertThat(decompressedBlocks).hasSize(1)
val decompressedBlock = decompressedBlocks[0]
val decodedBlock = BesuRlpBlobDecoder.decode(decompressedBlock)
// Only BlockHash and Timestamp are compressed to the Blob
assertThat(decodedBlock.header.hash).isEqualTo(originalBesuBlock.header.hash)
assertThat(decodedBlock.header.timestamp).isEqualTo(Instant.parse("2025-01-02T12:23:45Z").epochSeconds)
assertThat(decodedBlock.body.transactions).hasSize(2)
val decompressedTx0 = decodedBlock.body.transactions[0]
val decompressedTx1 = decodedBlock.body.transactions[1]
assertThat(decompressedTx0.type).isEqualTo(tx0.type.toBesu())
assertThat(decompressedTx0.sender.toArray()).isEqualTo(tx0.toBesu().sender.toArray())
assertThat(decompressedTx0.nonce.toULong()).isEqualTo(tx0.nonce)
assertThat(decompressedTx0.gasLimit.toULong()).isEqualTo(tx0.gasLimit)
assertThat(decompressedTx0.maxFeePerGas.getOrNull()).isNull()
assertThat(decompressedTx0.maxPriorityFeePerGas.getOrNull()).isNull()
assertThat(decompressedTx0.gasPrice.getOrNull()?.asBigInteger).isEqualTo(tx0.gasPrice!!.toBigInteger())
assertThat(decompressedTx0.to.getOrNull()?.toArray()).isEqualTo(tx0.to)
assertThat(decompressedTx0.value.asBigInteger).isEqualTo(tx0.value)
assertThat(decompressedTx0.payload.toArray()).isEqualTo(tx0.input)
assertThat(decompressedTx0.accessList.getOrNull()).isNull()
assertThat(decompressedTx1.type).isEqualTo(tx1.type.toBesu())
assertThat(decompressedTx1.sender.toArray()).isEqualTo(tx1.toBesu().sender.toArray())
assertThat(decompressedTx1.nonce.toULong()).isEqualTo(tx1.nonce)
assertThat(decompressedTx1.gasLimit.toULong()).isEqualTo(tx1.gasLimit)
assertThat(decompressedTx1.maxFeePerGas.getOrNull()?.asBigInteger)
.isEqualTo(tx1.maxFeePerGas?.toBigInteger())
assertThat(decompressedTx1.maxPriorityFeePerGas.getOrNull()?.asBigInteger)
.isEqualTo(tx1.maxPriorityFeePerGas?.toBigInteger())
assertThat(decompressedTx1.gasPrice.getOrNull()).isNull()
assertThat(decompressedTx1.to.getOrNull()?.toArray()).isEqualTo(tx1.to)
assertThat(decompressedTx1.value.asBigInteger).isEqualTo(tx1.value)
assertThat(decompressedTx1.payload.toArray()).isEqualTo(tx1.input)
assertThat(decompressedTx1.accessList.getOrNull()).isNotNull
decompressedTx1.accessList.getOrNull()!!.also { decompressedAccList ->
assertThat(decompressedAccList).hasSize(2)
assertThat(decompressedAccList[0]!!.address.toArray())
.isEqualTo(tx1.accessList!![0].address)
assertThat(decompressedAccList[0]!!.storageKeys[0].toArray())
.isEqualTo(tx1.accessList!![0].storageKeys[0])
assertThat(decompressedAccList[0]!!.storageKeys[1].toArray())
.isEqualTo(tx1.accessList!![0].storageKeys[1])
assertThat(decompressedAccList[1]!!.address.toArray())
.isEqualTo(tx1.accessList!![1].address)
assertThat(decompressedAccList[1]!!.storageKeys[0].toArray())
.isEqualTo(tx1.accessList!![1].storageKeys[0])
assertThat(decompressedAccList[1]!!.storageKeys[1].toArray())
.isEqualTo(tx1.accessList!![1].storageKeys[1])
}
}
}

View File

@@ -8,6 +8,7 @@ description="Linea domain models"
dependencies {
implementation project(":jvm-libs:generic:extensions:kotlin")
testFixturesApi "org.jetbrains.kotlinx:kotlinx-datetime:${libs.versions.kotlinxDatetime.get()}"
testFixturesApi project(":jvm-libs:linea:besu-libs")
}
jar {

View File

@@ -59,8 +59,8 @@ data class Transaction(
val input: ByteArray,
val r: BigInteger,
val s: BigInteger,
val v: ULong,
val yParity: ULong?,
val v: ULong?, // is defined if type is FRONTIER
val yParity: ULong?, // EIP-2718 yParity is defined for all transactions types after FRONTIER
val chainId: ULong? = null, // Optional field for EIP-155 transactions
val gasPrice: ULong?, // null for EIP-1559 transactions
val maxFeePerGas: ULong? = null, // null for EIP-1559 transactions

View File

@@ -5,6 +5,7 @@ import kotlinx.datetime.Instant
import net.consensys.ByteArrayExt
val zeroHash = ByteArray(32) { 0 }
val zeroAddress = ByteArray(20) { 0 }
fun createBlock(
number: ULong = 0UL,
@@ -15,7 +16,7 @@ fun createBlock(
parentHash: ByteArray = ByteArrayExt.random32(),
stateRoot: ByteArray = ByteArrayExt.random32(),
receiptsRoot: ByteArray = ByteArrayExt.random32(),
logsBloom: ByteArray = ByteArrayExt.random32(),
logsBloom: ByteArray = ByteArrayExt.random(size = 256),
ommersHash: ByteArray = ByteArrayExt.random32(),
timestamp: Instant = Clock.System.now(),
extraData: ByteArray = ByteArrayExt.random32(),
@@ -28,7 +29,7 @@ fun createBlock(
hash = hash,
parentHash = parentHash,
ommersHash = ommersHash,
miner = zeroHash,
miner = zeroAddress,
stateRoot = stateRoot,
transactionsRoot = transactionsRoot,
receiptsRoot = receiptsRoot,

View File

@@ -0,0 +1,240 @@
package linea.domain
import net.consensys.eth
import net.consensys.gwei
import net.consensys.toBigInteger
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.bytes.Bytes32
import org.hyperledger.besu.crypto.KeyPair
import org.hyperledger.besu.crypto.SECP256K1
import org.hyperledger.besu.crypto.SECPSignature
import org.hyperledger.besu.crypto.SignatureAlgorithm
import org.hyperledger.besu.datatypes.Address
import org.hyperledger.besu.datatypes.Wei
import java.math.BigInteger
import kotlin.random.Random
object TransactionFactory {
val secP256K1: SignatureAlgorithm = SECP256K1()
val defaltSecp256k1: KeyPair = secP256K1.generateKeyPair()
fun createTransactionFrontier(
nonce: ULong = 0UL,
gasLimit: ULong = 21_000UL,
to: ByteArray? = Random.nextBytes(20), // Nullable for contract creation transactions
value: BigInteger = 1UL.eth.toBigInteger(),
input: ByteArray = ByteArray(0),
r: BigInteger? = null,
s: BigInteger? = null,
v: ULong? = null,
chainId: ULong? = null, // Optional field for EIP-155 transactions
gasPrice: ULong? = 3UL.gwei, // null for EIP-1559 transactions
accessList: List<AccessListEntry>? = null // null non for EIP-2930 transactions
): Transaction {
return createTransaction(
type = TransactionType.FRONTIER,
nonce = nonce,
gasLimit = gasLimit,
to = to,
value = value,
input = input,
r = r,
s = s,
v = v,
yParity = null,
chainId = chainId,
gasPrice = gasPrice,
maxFeePerGas = null,
maxPriorityFeePerGas = null,
accessList = accessList
)
}
fun createTransactionEip1559(
nonce: ULong = 0UL,
gasLimit: ULong = 21_000UL,
to: ByteArray? = Random.nextBytes(20), // Nullable for contract creation transactions
value: BigInteger = 1UL.eth.toBigInteger(),
input: ByteArray = ByteArray(0),
r: BigInteger? = null,
s: BigInteger? = null,
yParity: ULong? = null,
chainId: ULong = 1337UL, // Optional field for EIP-155 transactions
maxFeePerGas: ULong? = 3UL.gwei, // null for EIP-1559 transactions
maxPriorityFeePerGas: ULong? = 2UL.gwei, // null for non EIP-1559 transactions
accessList: List<AccessListEntry>? = null // null non for EIP-2930 transactions
): Transaction = createTransaction(
type = TransactionType.EIP1559,
nonce = nonce,
gasLimit = gasLimit,
to = to,
value = value,
input = input,
r = r,
s = s,
v = yParity,
yParity = yParity,
chainId = chainId,
gasPrice = null,
maxFeePerGas = maxFeePerGas,
maxPriorityFeePerGas = maxPriorityFeePerGas,
accessList = accessList
)
fun createTransaction(
type: TransactionType = TransactionType.EIP1559,
nonce: ULong = 0UL,
gasLimit: ULong = 21_000UL,
to: ByteArray? = Random.nextBytes(20), // Nullable for contract creation transactions
value: BigInteger = 1UL.eth.toBigInteger(),
input: ByteArray = ByteArray(0),
r: BigInteger? = null,
s: BigInteger? = null,
v: ULong? = null,
yParity: ULong? = null,
chainId: ULong? = null, // Optional field for EIP-155 transactions
gasPrice: ULong? = null, // null for EIP-1559 transactions
maxFeePerGas: ULong? = 3UL.gwei, // null for EIP-1559 transactions
maxPriorityFeePerGas: ULong? = 2UL.gwei, // null for non EIP-1559 transactions
accessList: List<AccessListEntry>? = null // null non for EIP-2930 transactions
): Transaction {
val signatureArgs = listOfNotNull(r, s, v)
require(signatureArgs.let { it.size == 3 || it.isEmpty() }) {
"Either all of r, s, and v must be null or all of them must be non-null"
}
val eR: BigInteger
val eS: BigInteger
val eV: ULong?
val eyParity: ULong?
if (signatureArgs.isEmpty()) {
val sig = computeSignature(
type = type,
nonce = nonce,
gasLimit = gasLimit,
to = to,
value = value,
input = input,
chainId = chainId,
gasPrice = gasPrice,
maxFeePerGas = maxFeePerGas,
maxPriorityFeePerGas = maxPriorityFeePerGas,
accessList = accessList
)
eR = sig.r
eS = sig.s
eyParity = sig.recId.toULong()
eV = calcV(type, sig, chainId) ?: eyParity
} else {
eR = r!!
eS = s!!
eV = v!!
eyParity = yParity!!
}
return Transaction(
type = type,
nonce = nonce,
gasLimit = gasLimit,
to = to,
value = value,
input = input,
r = eR,
s = eS,
v = eV,
yParity = eyParity,
chainId = chainId,
gasPrice = gasPrice,
maxFeePerGas = maxFeePerGas,
maxPriorityFeePerGas = maxPriorityFeePerGas,
accessList = accessList
)
}
fun Transaction.computeSignature(
keyPair: KeyPair = defaltSecp256k1
): SECPSignature {
return computeSignature(
type = type,
nonce = nonce,
gasLimit = gasLimit,
to = to,
value = value,
input = input,
chainId = chainId,
gasPrice = gasPrice,
maxFeePerGas = maxFeePerGas,
maxPriorityFeePerGas = maxPriorityFeePerGas,
accessList = accessList,
keyPair = keyPair
)
}
fun computeSignature(
type: TransactionType,
nonce: ULong,
gasLimit: ULong,
to: ByteArray?,
value: BigInteger,
input: ByteArray,
chainId: ULong?,
gasPrice: ULong?,
maxFeePerGas: ULong?,
maxPriorityFeePerGas: ULong?,
accessList: List<AccessListEntry>?,
keyPair: KeyPair = defaltSecp256k1
): SECPSignature {
val besuType = type.toBesu()
return org.hyperledger.besu.ethereum.core.Transaction.builder()
.type(besuType)
.nonce(nonce.toLong())
.apply { gasPrice?.let { gasPrice(it.toWei()) } }
.gasLimit(gasLimit.toLong())
.to(to?.let { Address.wrap(Bytes.wrap(it)) })
.value(value.toWei())
.payload(Bytes.wrap(input))
.apply { chainId?.let { chainId(it.toBigInteger()) } }
.maxPriorityFeePerGas(maxPriorityFeePerGas?.toWei())
.maxFeePerGas(maxFeePerGas?.toWei())
.apply {
if (besuType.supportsAccessList()) {
val accList = accessList?.map { entry ->
org.hyperledger.besu.datatypes.AccessListEntry(
Address.wrap(Bytes.wrap(entry.address)),
entry.storageKeys.map { Bytes32.wrap(it) }
)
} ?: emptyList()
accessList(accList)
}
}
.signAndBuild(keyPair)
.signature
}
fun calcV(
transactionType: TransactionType,
signature: SECPSignature,
chainId: ULong?
): ULong? {
if (transactionType != TransactionType.FRONTIER) {
// EIP-2718 typed transaction, use yParity:
return null
} else {
val recId = signature.getRecId().toULong()
return chainId
?.let { (recId + 35UL) + (2UL * chainId) }
?: (recId + 27UL)
}
}
fun ULong.toWei(): Wei = Wei.of(this.toBigInteger())
fun BigInteger.toWei(): Wei = Wei.of(this)
fun TransactionType.toBesu(): org.hyperledger.besu.datatypes.TransactionType {
return when (this) {
linea.domain.TransactionType.FRONTIER -> org.hyperledger.besu.datatypes.TransactionType.FRONTIER
linea.domain.TransactionType.EIP1559 -> org.hyperledger.besu.datatypes.TransactionType.EIP1559
linea.domain.TransactionType.ACCESS_LIST -> org.hyperledger.besu.datatypes.TransactionType.ACCESS_LIST
linea.domain.TransactionType.BLOB -> org.hyperledger.besu.datatypes.TransactionType.BLOB
linea.domain.TransactionType.DELEGATE_CODE -> org.hyperledger.besu.datatypes.TransactionType.DELEGATE_CODE
}
}
}

View File

@@ -26,6 +26,76 @@ class EthGetBlockToLineaBlockMapperTest {
return ObjectMapperFactory.getObjectMapper().readValue(json, EthBlock.TransactionObject::class.java)
}
@Test
fun `should map frontier transactions without chainId replay protection and null yParity field`() {
val txWeb3j = serialize(
"""
{
"blockHash": "0x8de5957e6b5b519eb889a49604e96d7ace847475a9c3ccfaf0acc87e89175d0f",
"blockNumber": "0x1",
"from": "0x1b9abeec3215d8ade8a33607f2cf0f4f60e5f0d0",
"gas": "0x29e2f7",
"gasPrice": "0x7",
"maxFeePerGas": "0xe",
"maxPriorityFeePerGas": "0x0",
"hash": "0x09ffe43152572dedf9d4c893b0721692fa20a63d74deb8ff6b9d1ce74c1fd17d",
"input": "0x60806040523480156200001157600080fd5b506200001c",
"nonce": "0x0",
"to": null,
"transactionIndex": "0x0",
"value": "0x0",
"type": "0x2",
"accessList": [],
"chainId": "0x539",
"v": "0x0",
"r": "0x1fa31b9272cc67174efb129c2fd2ec5afda122503745beb22bd26e48a42240bb",
"s": "0x248c9cdf9352b4a379577c5b44bcb25a5350dc6722fd7b2aec40e193f670e4f4"
}
""".trimIndent()
)
val domainTx = txWeb3j.toDomain()
assertThat(domainTx).isEqualTo(
Transaction(
nonce = 0x0UL,
gasPrice = null,
gasLimit = 0x29e2f7UL,
to = null,
value = 0UL.toBigInteger(),
input = "0x60806040523480156200001157600080fd5b506200001c".decodeHex(),
r = "0x1fa31b9272cc67174efb129c2fd2ec5afda122503745beb22bd26e48a42240bb".toBigIntegerFromHex(),
s = "0x248c9cdf9352b4a379577c5b44bcb25a5350dc6722fd7b2aec40e193f670e4f4".toBigIntegerFromHex(),
v = 0UL,
yParity = null,
type = TransactionType.EIP1559,
chainId = 0x539UL,
maxFeePerGas = 0xeUL,
maxPriorityFeePerGas = 0x0UL,
accessList = emptyList()
)
)
domainTx.toBesu().also { besuTx ->
assertThat(besuTx.type).isEqualTo(org.hyperledger.besu.datatypes.TransactionType.EIP1559)
assertThat(besuTx.nonce).isEqualTo(0x0L)
assertThat(besuTx.gasPrice.getOrNull()).isNull()
assertThat(besuTx.maxFeePerGas.getOrNull()).isEqualTo(Wei.of(0xeL))
assertThat(besuTx.maxPriorityFeePerGas.getOrNull()).isEqualTo(Wei.of(0x0L))
assertThat(besuTx.gasLimit).isEqualTo(0x29e2f7L)
assertThat(besuTx.to.getOrNull()).isNull()
assertThat(besuTx.value).isEqualTo(Wei.of(0x0L))
assertThat(besuTx.payload).isEqualTo(Bytes.fromHexString("0x60806040523480156200001157600080fd5b506200001c"))
assertThat(besuTx.signature.r).isEqualTo(
"0x1fa31b9272cc67174efb129c2fd2ec5afda122503745beb22bd26e48a42240bb".toBigIntegerFromHex()
)
assertThat(besuTx.signature.s).isEqualTo(
"0x248c9cdf9352b4a379577c5b44bcb25a5350dc6722fd7b2aec40e193f670e4f4".toBigIntegerFromHex()
)
assertThat(besuTx.signature.recId).isEqualTo(0)
assertThat(besuTx.chainId.getOrNull()).isEqualTo(0x539L)
}
}
@Test
fun `should map frontier transactions`() {
val txWeb3j = serialize(

View File

@@ -34,12 +34,6 @@ data class BlockHeaderStaticFields(
val difficulty: ULong = 2UL
) {
companion object {
val mainnet = BlockHeaderStaticFields(
coinbase = "0x8F81e2E3F8b46467523463835F965fFE476E1c9E".decodeHex()
)
val sepolia = BlockHeaderStaticFields(
coinbase = "0x4D517Aef039A48b3B6bF921e210b7551C8E37107".decodeHex()
)
val localDev = BlockHeaderStaticFields(
coinbase = "0x6d976c9b8ceee705d4fe8699b44e5eb58242f484".decodeHex()
)

View File

@@ -43,7 +43,8 @@ class StateRecoveryApp(
* The block number at which the recovery mode will start overriding the recovery start block number
* this is meant for testing purposes, not production
*/
val overridingRecoveryStartBlockNumber: ULong? = null
val overridingRecoveryStartBlockNumber: ULong? = null,
val debugForceSyncStopBlockNumber: ULong? = null
) {
companion object {
val lineaMainnet = Config(
@@ -96,7 +97,8 @@ class StateRecoveryApp(
transactionDetailsClient = transactionDetailsClient,
blobDecompressor = blobDecompressor,
blockImporterAndStateVerifier = blockImporterAndStateVerifier,
pollingInterval = config.l1PollingInterval
pollingInterval = config.l1PollingInterval,
debugForceSyncStopBlockNumber = config.debugForceSyncStopBlockNumber
)
val lastSuccessfullyRecoveredFinalization: EthLogEvent<DataFinalizedV3>?
get() = stateSynchronizerService.lastSuccessfullyProcessedFinalization

View File

@@ -3,7 +3,6 @@ package linea.staterecovery
import build.linea.domain.EthLogEvent
import io.vertx.core.Vertx
import net.consensys.encodeHex
import net.consensys.linea.BlockNumberAndHash
import net.consensys.linea.BlockParameter
import net.consensys.linea.CommonDomainFunctions
import net.consensys.zkevm.PeriodicPollingService
@@ -21,6 +20,7 @@ class StateSynchronizerService(
private val blobDecompressor: BlobDecompressorAndDeserializer,
private val blockImporterAndStateVerifier: BlockImporterAndStateVerifier,
private val pollingInterval: Duration,
private val debugForceSyncStopBlockNumber: ULong?,
private val log: Logger = LogManager.getLogger(StateSynchronizerService::class.java)
) : PeriodicPollingService(
vertx = vertx,
@@ -120,40 +120,55 @@ class StateSynchronizerService(
private fun updateNodeWithBlobsAndVerifyState(
dataSubmissions: List<DataSubmittedEventAndBlobs>,
dataFinalizedV3: DataFinalizedV3
): SafeFuture<BlockNumberAndHash> {
): SafeFuture<Unit> {
return blobDecompressor
.decompress(
startBlockNumber = dataFinalizedV3.startBlockNumber,
blobs = dataSubmissions.flatMap { it.blobs }
)
.thenCompose(this::filterOutBlocksAlreadyImported)
.thenCompose { decompressedBlocks: List<BlockFromL1RecoveredData> ->
val blockInterval = CommonDomainFunctions.blockIntervalString(
decompressedBlocks.first().header.blockNumber,
decompressedBlocks.last().header.blockNumber
)
log.debug("importing blocks={} from finalization={}", blockInterval, dataFinalizedV3.intervalString())
blockImporterAndStateVerifier
.importBlocks(decompressedBlocks)
.thenCompose { importResult ->
log.debug("imported blocks={}", dataFinalizedV3.intervalString())
assertStateMatches(importResult, dataFinalizedV3)
}
.thenApply {
BlockNumberAndHash(
number = decompressedBlocks.last().header.blockNumber,
hash = decompressedBlocks.last().header.blockHash
)
}
.thenCompose(this::filterOutBlocksAlreadyImportedAndBeyondStopSync)
.thenCompose { decompressedBlocksToImport: List<BlockFromL1RecoveredData> ->
if (decompressedBlocksToImport.isEmpty()) {
log.info(
"stopping recovery sync: imported all blocks up to debugForceSyncStopBlockNumber={} finalization={}",
debugForceSyncStopBlockNumber,
dataFinalizedV3.intervalString()
)
this.stop()
SafeFuture.completedFuture(null)
} else {
importBlocksAndAssertStateroot(decompressedBlocksToImport, dataFinalizedV3)
}
}
}
private fun filterOutBlocksAlreadyImported(
private fun importBlocksAndAssertStateroot(
decompressedBlocksToImport: List<BlockFromL1RecoveredData>,
dataFinalizedV3: DataFinalizedV3
): SafeFuture<Unit> {
val blockInterval = CommonDomainFunctions.blockIntervalString(
decompressedBlocksToImport.first().header.blockNumber,
decompressedBlocksToImport.last().header.blockNumber
)
log.debug("importing blocks={} from finalization={}", blockInterval, dataFinalizedV3.intervalString())
return blockImporterAndStateVerifier
.importBlocks(decompressedBlocksToImport)
.thenCompose { importResult ->
log.debug("imported blocks={}", dataFinalizedV3.intervalString())
assertStateMatches(importResult, dataFinalizedV3)
}
}
private fun filterOutBlocksAlreadyImportedAndBeyondStopSync(
blocks: List<BlockFromL1RecoveredData>
): SafeFuture<List<BlockFromL1RecoveredData>> {
return elClient.getBlockNumberAndHash(blockParameter = BlockParameter.Tag.LATEST)
.thenApply { headBlock ->
blocks.dropWhile { it.header.blockNumber <= headBlock.number }
var filteredBlocks = blocks.dropWhile { it.header.blockNumber <= headBlock.number }
if (debugForceSyncStopBlockNumber != null) {
filteredBlocks = filteredBlocks.takeWhile { it.header.blockNumber <= debugForceSyncStopBlockNumber }
}
filteredBlocks
}
}
@@ -163,20 +178,20 @@ class StateSynchronizerService(
): SafeFuture<Unit> {
return if (importResult.zkStateRootHash.contentEquals(finalizedV3.finalStateRootHash)) {
log.info(
"state recovered up to block={} zkStateRootHash={} finalization={}",
importResult.blockNumber,
importResult.zkStateRootHash.encodeHex(),
finalizedV3.intervalString()
"state recovered up to finalization={} zkStateRootHash={}",
finalizedV3.intervalString(),
importResult.zkStateRootHash.encodeHex()
)
SafeFuture.completedFuture(Unit)
} else {
log.error(
"stopping data recovery from L1, stateRootHash mismatch: " +
"finalization={} recoveredStateRootHash={} expected block={} to have l1 proven stateRootHash={}",
"finalization={} recovered block={} yielded recoveredStateRootHash={} expected to have " +
"l1 proven stateRootHash={}",
finalizedV3.intervalString(),
finalizedV3.finalStateRootHash.encodeHex(),
importResult.blockNumber,
importResult.zkStateRootHash.encodeHex(),
finalizedV3.endBlockNumber
finalizedV3.finalStateRootHash.encodeHex()
)
stateRootMismatchFound = true
this.stop()

View File

@@ -31,15 +31,58 @@ fun createAppAllInProcess(
blockHeaderStaticFields: BlockHeaderStaticFields,
appConfig: StateRecoveryApp.Config
): StateRecoveryApp {
return createAppClients(
vertx = vertx,
meterRegistry = meterRegistry,
stateManagerClientEndpoint = stateManagerClientEndpoint,
l1RpcEndpoint = l1RpcEndpoint,
blobScanEndpoint = blobScanEndpoint,
appConfig = appConfig
).let { clients ->
val app = StateRecoveryApp(
vertx = vertx,
lineaContractClient = clients.lineaContractClient,
ethLogsSearcher = clients.ethLogsSearcher,
blobFetcher = clients.blobScanClient,
elClient = elClient,
stateManagerClient = clients.stateManagerClient,
transactionDetailsClient = clients.transactionDetailsClient,
blockHeaderStaticFields = blockHeaderStaticFields,
config = appConfig
)
app
}
}
data class AppClients(
val lineaContractClient: Web3JLineaRollupSmartContractClientReadOnly,
val ethLogsSearcher: Web3JLogsSearcher,
val blobScanClient: BlobScanClient,
val stateManagerClient: StateManagerClientV1,
val transactionDetailsClient: TransactionDetailsClient
)
fun createAppClients(
vertx: Vertx = Vertx.vertx(),
meterRegistry: MeterRegistry = BackendRegistries.getDefaultNow(),
l1RpcEndpoint: URI,
l1RpcRequestRetryConfig: RequestRetryConfig = RequestRetryConfig(backoffDelay = 1.seconds),
blobScanEndpoint: URI,
blobScanRequestRetryConfig: RequestRetryConfig = RequestRetryConfig(backoffDelay = 1.seconds),
stateManagerClientEndpoint: URI,
stateManagerRequestRetry: RequestRetryConfig = RequestRetryConfig(backoffDelay = 1.seconds),
zkStateManagerVersion: String = "2.3.0",
appConfig: StateRecoveryApp.Config
): AppClients {
val lineaContractClient = Web3JLineaRollupSmartContractClientReadOnly(
contractAddress = appConfig.smartContractAddress,
web3j = createWeb3jHttpClient(
rpcUrl = l1RpcEndpoint.toString(),
log = LogManager.getLogger("linea.plugin.staterecover.clients.l1.smart-contract")
log = LogManager.getLogger("linea.plugin.staterecovery.clients.l1.smart-contract")
)
)
val ethLogsSearcher = run {
val log = LogManager.getLogger("linea.plugin.staterecover.clients.l1.logs-searcher")
val log = LogManager.getLogger("linea.plugin.staterecovery.clients.l1.logs-searcher")
Web3JLogsSearcher(
vertx = vertx,
web3jClient = createWeb3jHttpClient(
@@ -52,43 +95,29 @@ fun createAppAllInProcess(
val blobScanClient = BlobScanClient.create(
vertx = vertx,
endpoint = blobScanEndpoint,
requestRetryConfig = RequestRetryConfig(
backoffDelay = 1.seconds
),
logger = LogManager.getLogger("linea.plugin.staterecover.clients.l1.blob-scan")
requestRetryConfig = blobScanRequestRetryConfig,
logger = LogManager.getLogger("linea.plugin.staterecovery.clients.l1.blob-scan")
)
val jsonRpcClientFactory = VertxHttpJsonRpcClientFactory(vertx, MicrometerMetricsFacade(meterRegistry))
val stateManagerClient: StateManagerClientV1 = StateManagerV1JsonRpcClient.create(
rpcClientFactory = jsonRpcClientFactory,
endpoints = listOf(stateManagerClientEndpoint),
maxInflightRequestsPerClient = 10u,
requestRetry = RequestRetryConfig(
backoffDelay = 1.seconds
),
zkStateManagerVersion = "2.3.0",
logger = LogManager.getLogger("linea.plugin.staterecover.clients.state-manager")
requestRetry = stateManagerRequestRetry,
zkStateManagerVersion = zkStateManagerVersion,
logger = LogManager.getLogger("linea.plugin.staterecovery.clients.state-manager")
)
val transactionDetailsClient: TransactionDetailsClient = VertxTransactionDetailsClient.create(
jsonRpcClientFactory = jsonRpcClientFactory,
endpoint = l1RpcEndpoint,
retryConfig = RequestRetryConfig(
backoffDelay = 1.seconds
),
logger = LogManager.getLogger("linea.plugin.staterecover.clients.l1.transaction-details")
retryConfig = l1RpcRequestRetryConfig,
logger = LogManager.getLogger("linea.plugin.staterecovery.clients.l1.transaction-details")
)
val app = StateRecoveryApp(
vertx = vertx,
return AppClients(
lineaContractClient = lineaContractClient,
ethLogsSearcher = ethLogsSearcher,
blobFetcher = blobScanClient,
elClient = elClient,
blobScanClient = blobScanClient,
stateManagerClient = stateManagerClient,
transactionDetailsClient = transactionDetailsClient,
blockHeaderStaticFields = blockHeaderStaticFields,
config = appConfig
transactionDetailsClient = transactionDetailsClient
)
return app
}

View File

@@ -71,7 +71,8 @@ open class LineaStateRecoveryPlugin : BesuPlugin {
miningService = serviceManager.getServiceOrThrow(MiningService::class.java),
recoveryStatePersistence = this.recoveryStatusPersistence,
synchronizationService = synchronizationService,
headBlockNumber = blockchainService.chainHeadHeader.number.toULong()
headBlockNumber = blockchainService.chainHeadHeader.number.toULong(),
debugForceSyncStopBlockNumber = config.debugForceSyncStopBlockNumber
)
val simulatorService = serviceManager.getServiceOrThrow(BlockSimulationService::class.java)
val executionLayerClient = ExecutionLayerInProcessClient.create(
@@ -96,7 +97,8 @@ open class LineaStateRecoveryPlugin : BesuPlugin {
smartContractAddress = config.l1SmartContractAddress.toString(),
l1LatestSearchBlock = net.consensys.linea.BlockParameter.Tag.LATEST,
overridingRecoveryStartBlockNumber = config.overridingRecoveryStartBlockNumber,
l1PollingInterval = config.l1PollingInterval
l1PollingInterval = config.l1PollingInterval,
debugForceSyncStopBlockNumber = config.debugForceSyncStopBlockNumber
)
)
}
@@ -105,17 +107,17 @@ open class LineaStateRecoveryPlugin : BesuPlugin {
serviceManager
.getServiceOrThrow(BesuEvents::class.java)
.addBlockAddedListener(recoveryModeManager)
this.stateRecoverApp.start().get()
log.info(
"started: recoveryStartBlockNumber={}",
this.recoveryStatusPersistence.getRecoveryStartBlockNumber()
)
}
override fun afterExternalServicePostMainLoop() {
// we need to recall this again because Sync and Mining services
// may have been started after the plugin start
this.recoveryModeManager.enableRecoveryModeIfNecessary()
log.info(
"started: recoveryStartBlockNumber={}",
this.recoveryStatusPersistence.getRecoveryStartBlockNumber()
)
this.stateRecoverApp.start().get()
}
override fun stop() {

View File

@@ -14,7 +14,8 @@ data class PluginConfig(
val blobscanEndpoint: URI,
val shomeiEndpoint: URI,
val l1PollingInterval: kotlin.time.Duration,
val overridingRecoveryStartBlockNumber: ULong? = null
val overridingRecoveryStartBlockNumber: ULong? = null,
val debugForceSyncStopBlockNumber: ULong? = null
) {
init {
require(l1PollingInterval >= 1.milliseconds) { "Polling interval=$l1PollingInterval must be greater than 1ms." }
@@ -84,6 +85,17 @@ class PluginCliOptions {
)
var overridingRecoveryStartBlockNumber: Long? = null
@CommandLine.Option(
names = ["--$cliOptionsPrefix-debug-force-sync-stop-block-number"],
description = [
"Forces Besu to stop syncing at the given block number. " +
"This is mean for testing purposes, not production. Must be greater than or equal to 1."
],
defaultValue = "\${env:STATERECOVERY_DEBUG_FORCE_STOP_SYNC_BLOCK_NUMBER}",
required = false
)
var debugForceSyncStopBlockNumber: Long? = null
fun getConfig(): PluginConfig {
require(overridingRecoveryStartBlockNumber == null || overridingRecoveryStartBlockNumber!! >= 1) {
"overridingRecoveryStartBlockNumber=$overridingRecoveryStartBlockNumber must be greater than or equal to 1"
@@ -95,7 +107,8 @@ class PluginCliOptions {
blobscanEndpoint = blobscanEndpoint,
shomeiEndpoint = shomeiEndpoint,
l1PollingInterval = l1PollingInterval.toKotlinDuration(),
overridingRecoveryStartBlockNumber = overridingRecoveryStartBlockNumber?.toULong()
overridingRecoveryStartBlockNumber = overridingRecoveryStartBlockNumber?.toULong(),
debugForceSyncStopBlockNumber = debugForceSyncStopBlockNumber?.toULong()
)
}

View File

@@ -15,6 +15,7 @@ class RecoveryModeManager(
private val p2pService: P2PService,
private val miningService: MiningService,
private val recoveryStatePersistence: RecoveryStatusPersistence,
private val debugForceSyncStopBlockNumber: ULong? = null,
headBlockNumber: ULong
) :
BesuEvents.BlockAddedListener {
@@ -52,11 +53,26 @@ class RecoveryModeManager(
val blockNumber = addedBlockContext.blockHeader.number
headBlockNumber = blockNumber.toULong()
if (!recoveryModeTriggered.get() && hasReachedTargetBlock()) {
log.info(
"Stopping synchronization services at block={} recoveryTargetBlockNumber={} was reached",
headBlockNumber,
targetBlockNumber
)
switchToRecoveryMode()
} else if (debugForceSyncStopBlockNumber != null && headBlockNumber >= debugForceSyncStopBlockNumber) {
log.info(
"Stopping synchronization services at block={} debugForceSyncStopBlockNumber={}",
headBlockNumber,
debugForceSyncStopBlockNumber
)
stopBesuServices()
}
}
private fun hasReachedTargetBlock(): Boolean {
private fun hasReachedTargetBlock(
headBlockNumber: ULong = this.headBlockNumber,
targetBlockNumber: ULong? = this.targetBlockNumber
): Boolean {
return (headBlockNumber + 1u) >= (targetBlockNumber ?: ULong.MAX_VALUE)
}
@@ -79,37 +95,30 @@ class RecoveryModeManager(
}
}
val effectiveRecoveryStartBlockNumber = if (targetBlockNumber <= headBlockNumber + 1u) {
val effectiveRecoveryStartBlockNumber = headBlockNumber + 1u
log.warn(
"enabling recovery mode immediately at blockNumber={} recoveryTargetBlockNumber={} headBlockNumber={}",
effectiveRecoveryStartBlockNumber,
targetBlockNumber,
headBlockNumber
)
switchToRecoveryMode()
effectiveRecoveryStartBlockNumber
} else {
targetBlockNumber
}
val effectiveRecoveryStartBlockNumber =
if (hasReachedTargetBlock(headBlockNumber, targetBlockNumber)) {
val effectiveRecoveryStartBlockNumber = headBlockNumber + 1u
effectiveRecoveryStartBlockNumber
} else {
targetBlockNumber
}
recoveryStatePersistence.saveRecoveryStartBlockNumber(effectiveRecoveryStartBlockNumber)
enableRecoveryModeIfNecessary()
}
/** Switches the node to recovery mode. */
private fun switchToRecoveryMode() {
log.warn("Stopping synchronization service")
synchronizationService.stop()
log.warn("Stopping P2P discovery service")
p2pService.disableDiscovery()
log.warn("Stopping mining service")
miningService.stop()
log.info(
"switched to state recovery mode at block={}",
headBlockNumber
)
stopBesuServices()
recoveryModeTriggered.set(true)
}
private fun stopBesuServices() {
log.info("Stopping synchronization service")
synchronizationService.stop()
log.info("Stopping P2P discovery service")
p2pService.disableDiscovery()
log.info("Stopping mining service")
miningService.stop()
}
}

View File

@@ -28,21 +28,22 @@ object TransactionMapper {
chainId: ULong
): Transaction {
val builder = Transaction.builder()
builder
.sender(Address.fromHexString(transaction.from.encodeHex()))
.nonce(transaction.nonce.toLong())
.gasLimit(transaction.gasLimit.toLong())
.value(Wei.of(transaction.value))
.payload(Bytes.wrap())
.chainId(chainId.toBigInteger())
// compressed transaction don't have signature,
// so we use a dummy signature which is not verified by Besu in RecoveryMode
.signature(SECPSignature(BigInteger.ZERO, BigInteger.ZERO, 0.toByte()))
transaction.data?.let { data -> builder.payload(Bytes.wrap(data)) }
transaction.to?.let { builder.to(Address.fromHexString(it.encodeHex())) }
transaction.gasPrice?.let { builder.gasPrice(Wei.of(it)) }
transaction.maxPriorityFeePerGas?.let { builder.maxPriorityFeePerGas(Wei.of(it)) }
transaction.maxFeePerGas?.let { builder.maxFeePerGas(Wei.of(it)) }
transaction.accessList?.let { builder.accessList(mapAccessListEntries(it)) }
builder.signature(SECPSignature(BigInteger.ZERO, BigInteger.ZERO, 0.toByte()))
return builder.build()
}

View File

@@ -86,7 +86,7 @@ class StateRecoveryE2ETest {
private fun freshStartOfStack() {
log.debug("restarting the stack")
Runner.executeCommandFailOnNonZeroExitCode(
command = "make fresh-start-all-staterecovery",
command = "make start-env-with-staterecovery",
envVars = mapOf(
"L1_GENESIS_TIME" to Clock.System.now().plus(5.seconds).epochSeconds.toString()
),

View File

@@ -53,6 +53,10 @@ class StateRecoveryAppWithFakeExecutionClientIntTest {
private lateinit var contractClientForBlobSubmissions: LineaRollupSmartContractClient
private lateinit var contractClientForAggregationSubmissions: LineaRollupSmartContractClient
private lateinit var blobScanClient: BlobScanClient
private lateinit var logsSearcher: Web3JLogsSearcher
private lateinit var vertx: Vertx
private val testDataDir = run {
"testdata/coordinator/prover/v3"
}
@@ -62,6 +66,7 @@ class StateRecoveryAppWithFakeExecutionClientIntTest {
@BeforeEach
fun beforeEach(vertx: Vertx) {
this.vertx = vertx
val jsonRpcFactory = VertxHttpJsonRpcClientFactory(
vertx = vertx,
metricsFacade = MicrometerMetricsFacade(SimpleMeterRegistry())
@@ -98,7 +103,7 @@ class StateRecoveryAppWithFakeExecutionClientIntTest {
),
contractAddress = rollupDeploymentResult.contractAddress
)
val logsSearcher = Web3JLogsSearcher(
this.logsSearcher = Web3JLogsSearcher(
vertx = vertx,
web3jClient = Web3jClientManager.buildL1Client(
log = LogManager.getLogger("test.clients.l1.events-fetcher"),
@@ -118,7 +123,7 @@ class StateRecoveryAppWithFakeExecutionClientIntTest {
rollupDeploymentResult.rollupOperators[1].txManager,
smartContractErrors = lineaRollupContractErrors
)
val blobScanClient = BlobScanClient.create(
this.blobScanClient = BlobScanClient.create(
vertx = vertx,
endpoint = URI(blobScanUrl),
requestRetryConfig = RequestRetryConfig(
@@ -129,6 +134,28 @@ class StateRecoveryAppWithFakeExecutionClientIntTest {
logger = LogManager.getLogger("test.clients.l1.blobscan")
)
instantiateStateRecoveryApp()
configureLoggers(
rootLevel = Level.INFO,
log.name to Level.INFO,
"linea.testing.submission" to Level.INFO,
"net.consensys.linea.contract.Web3JContractAsyncHelper" to Level.WARN, // silence noisy gasPrice Caps logs
"test.clients.l1.executionlayer" to Level.DEBUG,
"test.clients.l1.web3j-default" to Level.INFO,
"test.clients.l1.state-manager" to Level.INFO,
"test.clients.l1.transaction-details" to Level.INFO,
"test.clients.l1.linea-contract" to Level.INFO,
"test.clients.l1.events-fetcher" to Level.INFO,
"test.clients.l1.blobscan" to Level.INFO,
"net.consensys.linea.contract.l1" to Level.INFO,
"test.fake.clients.l1.fake-execution-layer" to Level.INFO
)
}
fun instantiateStateRecoveryApp(
debugForceSyncStopBlockNumber: ULong? = null
) {
stateRecoverApp = StateRecoveryApp(
vertx = vertx,
elClient = executionLayerClient,
@@ -142,24 +169,10 @@ class StateRecoveryAppWithFakeExecutionClientIntTest {
l1LatestSearchBlock = BlockParameter.Tag.LATEST,
l1PollingInterval = 10.milliseconds,
executionClientPollingInterval = 1.seconds,
smartContractAddress = lineaContractClient.getAddress()
smartContractAddress = lineaContractClient.getAddress(),
debugForceSyncStopBlockNumber = debugForceSyncStopBlockNumber
)
)
configureLoggers(
rootLevel = Level.INFO,
log.name to Level.INFO,
"net.consensys.linea.contract.Web3JContractAsyncHelper" to Level.WARN, // silence noisy gasPrice Caps logs
"test.clients.l1.executionlayer" to Level.DEBUG,
"test.clients.l1.web3j-default" to Level.INFO,
"test.clients.l1.state-manager" to Level.INFO,
"test.clients.l1.transaction-details" to Level.INFO,
"test.clients.l1.linea-contract" to Level.INFO,
"test.clients.l1.events-fetcher" to Level.INFO,
"test.clients.l1.blobscan" to Level.INFO,
"net.consensys.linea.contract.l1" to Level.INFO,
"test.fake.clients.l1.fake-execution-layer" to Level.INFO
)
}
private fun submitDataToL1ContactAndWaitExecution(
@@ -377,4 +390,23 @@ class StateRecoveryAppWithFakeExecutionClientIntTest {
assertThat(executionLayerClient.headBlock.number)
.isEqualTo(aggregationsAndBlobs[1].aggregation!!.endBlockNumber)
}
@Test
fun `should stop synch at forceSyncStopBlockNumber`() {
val debugForceSyncStopBlockNumber = aggregationsAndBlobs[2].aggregation!!.startBlockNumber
instantiateStateRecoveryApp(debugForceSyncStopBlockNumber = debugForceSyncStopBlockNumber)
log.debug("forceSyncStopBlockNumber={}", fakeStateManagerClient)
stateRecoverApp.start().get()
submitDataToL1ContactAndWaitExecution(waitTimeout = 1.minutes)
await()
.atMost(1.minutes.toJavaDuration())
.untilAsserted {
println(executionLayerClient.headBlock.number)
assertThat(executionLayerClient.headBlock.number).isGreaterThanOrEqualTo(debugForceSyncStopBlockNumber)
}
assertThat(executionLayerClient.headBlock.number).isEqualTo(debugForceSyncStopBlockNumber)
}
}