coordinator: improve web3j request error handling and fix message anchoring Event serialization (#903)

* coordinator: improve web3j request error handling

* coordinator: order MessageSentEvent.kt params for better reading

* coordinator: fix MessageSentEvent.kt serialization

* coordinator: simplify rejectOnJsonRpcError function typing

* coordinator: tiny optimization on L1MessageSentEventsFetcher
This commit is contained in:
Fluent Crafter
2025-04-28 18:28:22 +01:00
committed by GitHub
parent a272eb16a3
commit cfadda3cf9
10 changed files with 92 additions and 58 deletions

View File

@@ -79,7 +79,7 @@ internal class L1MessageSentEventsFetcher(
),
result.intervalString()
)
result.logs.map(MessageSentEvent::fromEthLog)
events
}
}
}

View File

@@ -5,6 +5,7 @@ import linea.domain.EthLogEvent
import linea.kotlin.encodeHex
import linea.kotlin.sliceOf32
import linea.kotlin.toULongFromLast8Bytes
import java.math.BigInteger
/**
* @notice Emitted when a message is sent.
@@ -29,11 +30,11 @@ event MessageSent(
);
*/
data class MessageSentEvent(
val messageNumber: ULong, // Unique message number
val from: ByteArray, // Address of the sender
val to: ByteArray, // Address of the recipient
val fee: ULong, // Fee paid in Wei
val value: ULong, // Value sent in Wei
val messageNumber: ULong, // Unique message number
val fee: BigInteger, // Fee paid in Wei
val value: BigInteger, // Value sent in Wei
val calldata: ByteArray, // Calldata passed to the recipient
val messageHash: ByteArray // Hash of the message parameters
) : Comparable<MessageSentEvent> {
@@ -43,12 +44,12 @@ data class MessageSentEvent(
fun fromEthLog(ethLog: EthLog): EthLogEvent<MessageSentEvent> {
return EthLogEvent(
event = MessageSentEvent(
messageNumber = ethLog.data.sliceOf32(sliceNumber = 2).toULongFromLast8Bytes(),
from = ethLog.topics[1].sliceArray(12..31),
to = ethLog.topics[2].sliceArray(12..31),
fee = ethLog.data.sliceOf32(sliceNumber = 0).toULongFromLast8Bytes(),
value = ethLog.data.sliceOf32(sliceNumber = 1).toULongFromLast8Bytes(),
messageNumber = ethLog.data.sliceOf32(sliceNumber = 3).toULongFromLast8Bytes(),
calldata = ethLog.data.sliceArray(32 * 4..ethLog.data.size - 1),
fee = BigInteger(ethLog.data.sliceOf32(sliceNumber = 0)),
value = BigInteger(ethLog.data.sliceOf32(sliceNumber = 1)),
calldata = ethLog.data.sliceArray(32 * 3..ethLog.data.size - 1),
messageHash = ethLog.topics[3]
),
log = ethLog
@@ -86,11 +87,11 @@ data class MessageSentEvent(
override fun toString(): String {
return "MessageSentEvent(" +
"messageNumber=$messageNumber, " +
"from=${from.encodeHex()}, " +
"to=${to.encodeHex()}, " +
"fee=$fee, " +
"value=$value, " +
"nonce=$messageNumber, " +
"calldata=${calldata.encodeHex()}, " +
"messageHash=${messageHash.encodeHex()}" +
")"

View File

@@ -22,7 +22,6 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
@@ -185,8 +184,10 @@ class MessageAnchoringAppTest {
val anchoringApp = createApp(
l1PollingInterval = 1.milliseconds,
l1EventSearchBlockChunk = 10u,
l1EventPollingTimeout = 50.milliseconds,
l1SuccessBackoffDelay = 20.milliseconds,
// for this scenario l1EventPollingTimeout < l1SuccessBackoffDelay
// to simulate timeout after the 1st response and return, doing next request on next tick
l1EventPollingTimeout = 1.milliseconds,
l1SuccessBackoffDelay = 3.milliseconds,
maxMessagesToAnchorPerL2Transaction = 50u,
anchoringTickInterval = 20.milliseconds
)
@@ -196,7 +197,7 @@ class MessageAnchoringAppTest {
anchoringApp.start().get()
await()
.atMost(10.minutes.toJavaDuration())
.atMost(10.seconds.toJavaDuration())
.untilAsserted {
assertThat(l2MessageService.getLastAnchoredL1MessageNumber(block = BlockParameter.Tag.LATEST).get())
.isEqualTo(ethLogs.last().l1RollingHashUpdated.event.messageNumber)

View File

@@ -35,7 +35,6 @@ fun createMessageSentEthLogV1(
"0x" +
fee.toHexStringUInt256(hexPrefix = false) +
value.toHexStringUInt256(hexPrefix = false) +
"0000000000000000000000000000000000000000000000000000000000000000" + // padding
messageNumber.toHexStringUInt256(hexPrefix = false) +
calldata.encodeHex(prefix = false) // calldata
).decodeHex(),

View File

@@ -3,35 +3,61 @@ package linea.anchoring.events
import linea.domain.EthLog
import linea.domain.EthLogEvent
import linea.kotlin.decodeHex
import linea.kotlin.toBigInteger
import linea.kotlin.toULongFromHex
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import java.math.BigInteger
class MessageSentEventTest {
@Test
fun `fromEthLog should map EthLog to MessageSentEvent correctly`() {
fun `fromEthLog should map EthLog to MessageSentEvent correctly 2`() {
/**
* curl -s -H 'content-type:application/json' --data '{"jsonrpc":"2.0","id":"53","method":"eth_getLogs","params":[{"address":["0xB218f8A4Bc926cF1cA7b3423c154a0D627Bdb7E5"], "topics":["0xe856c2b8bd4eb0027ce32eeaf595c21b0b6b4644b326e5b7bd80a1cf8db72e6c",null,null,"0x6011E7F01AAE0E2CD2650CBE229A330F93821D5ED8A2E8830D1E64BA3C76CC3F"], "fromBlock":"earliest","toBlock":"latest"}]}' $URL_SEPOLIA | jq '.result'
* [
* {
* "removed": false,
* "logIndex": "0x8c",
* "transactionIndex": "0x36",
* "transactionHash": "0x7fd853e580f967cf9004a019bcce26b8d4d2b880ef1106f5af29e2e23695e9ec",
* "blockHash": "0x8c08c5c7f1d8a256a392f5d261b9e4ba39337903646ba009f16a0b2029447c20",
* "blockNumber": "0x7cec04",
* "address": "0xb218f8a4bc926cf1ca7b3423c154a0d627bdb7e5",
* "data": "0x000000000000000000000000000000000000000000000000002386f26fc100000000000000000000000000000000000000000000000000000de0b6b3a764000000000000000000000000000000000000000000000000000000000000000140430000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000001d54686973206973206a75737420666f7220796f7520676f6f6420736972000000",
* "topics": [
* "0xe856c2b8bd4eb0027ce32eeaf595c21b0b6b4644b326e5b7bd80a1cf8db72e6c",
* "0x00000000000000000000000017e764ba16c95815ca06fb5d174f08d842e340df",
* "0x0000000000000000000000007a98052d4be677df72ede5a3f2829c893d10388d",
* "0x6011e7f01aae0e2cd2650cbe229a330f93821d5ed8a2e8830d1e64ba3c76cc3f"
* ]
* }
* ]
*/
val callData = "0000000000000000000000000000000000000000000000000000000000000080" +
"000000000000000000000000000000000000000000000000000000000000001d" +
"54686973206973206a75737420666f7220796f7520676f6f6420736972000000"
val ethLog = EthLog(
removed = false,
logIndex = "0x1".toULongFromHex(),
transactionIndex = "0x0".toULongFromHex(),
transactionHash = "0x2d408675b46835a04ba632ac437ca9b9ca41b834609b7453630fe594ba658b4c".decodeHex(),
blockHash = "0x4d63489ac2faee706cca0f078f23973facc42a87dc75cfdf6fae5ac2d8c9b243".decodeHex(),
blockNumber = "0x1109669".toULongFromHex(),
address = "0x508ca82df566dcd1b0de8296e70a96332cd644ec".decodeHex(),
transactionHash = "0x7fd853e580f967cf9004a019bcce26b8d4d2b880ef1106f5af29e2e23695e9ec".decodeHex(),
blockHash = "0x8c08c5c7f1d8a256a392f5d261b9e4ba39337903646ba009f16a0b2029447c20".decodeHex(),
blockNumber = "0x7cec04".toULongFromHex(),
address = "0xb218f8a4bc926cf1ca7b3423c154a0d627bdb7e5".decodeHex(),
data = (
"0x" +
"00000000000000000000000000000000000000000000000000000000000003e8" + // fee
"00000000000000000000000000000000000000000000000000000000000007d0" + // value
"0000000000000000000000000000000000000000000000000000000000000000" + // padding
"0000000000000000000000000000000000000000000000000000000000002710" + // nonce
"deadbeef"
).decodeHex(), // calldata
"000000000000000000000000000000000000000000000000002386f26fc10000" + // fee
"0000000000000000000000000000000000000000000000000de0b6b3a7640000" + // value
"0000000000000000000000000000000000000000000000000000000000014043" + // messageNumber
// calldata
callData
).decodeHex(),
topics = listOf(
"0xe856c2b8bd4eb0027ce32eeaf595c21b0b6b4644b326e5b7bd80a1cf8db72e6c".decodeHex(),
"0x0000000000000000000000001234567890abcdef1234567890abcdef12345678".decodeHex(), // from
"0x000000000000000000000000abcdef1234567890abcdef1234567890abcdef12".decodeHex(), // to
"0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd".decodeHex() // messageHash
"0x00000000000000000000000017e764ba16c95815ca06fb5d174f08d842e340df".decodeHex(), // from
"0x0000000000000000000000007a98052d4be677df72ede5a3f2829c893d10388d".decodeHex(), // to
"0x6011e7f01aae0e2cd2650cbe229a330f93821d5ed8a2e8830d1e64ba3c76cc3f".decodeHex() // messageHash
)
)
@@ -40,13 +66,13 @@ class MessageSentEventTest {
// Then
val expectedEvent = MessageSentEvent(
from = "0x1234567890abcdef1234567890abcdef12345678".decodeHex(),
to = "0xabcdef1234567890abcdef1234567890abcdef12".decodeHex(),
fee = 1000uL,
value = 2000uL,
messageNumber = 10000uL,
calldata = "deadbeef".decodeHex(),
messageHash = "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd".decodeHex()
messageNumber = 81987UL,
from = "0x17e764ba16c95815ca06fb5d174f08d842e340df".decodeHex(),
to = "0x7a98052d4be677df72ede5a3f2829c893d10388d".decodeHex(),
fee = BigInteger("2386f26fc10000", 16),
value = BigInteger("de0b6b3a7640000", 16),
calldata = "0x$callData".decodeHex(),
messageHash = "0x6011e7f01aae0e2cd2650cbe229a330f93821d5ed8a2e8830d1e64ba3c76cc3f".decodeHex()
)
val expectedEthLogEvent = EthLogEvent(
event = expectedEvent,
@@ -57,11 +83,11 @@ class MessageSentEventTest {
}
private val eventTemplate = MessageSentEvent(
messageNumber = 1uL,
from = "0x000000000000000000000000000000000000000a".decodeHex(),
to = "0x000000000000000000000000000000000000000b".decodeHex(),
fee = 0uL,
value = 0uL,
messageNumber = 1uL,
fee = 0uL.toBigInteger(),
value = 0uL.toBigInteger(),
calldata = ByteArray(0),
messageHash = "0x0000000000000000000000000000000000000000".decodeHex()
)

View File

@@ -148,7 +148,7 @@ class EthLogsSearcherImplIntTest {
).get()
}
.hasCauseInstanceOf(RuntimeException::class.java)
.hasMessageContaining("json-rpc error: code=-32000 message=Error: unable to retrieve logs")
.hasMessageContaining("eth_getLogs failed with JsonRpcError code=-32000 message=Error: unable to retrieve logs")
}
@Test

View File

@@ -1,7 +1,10 @@
package linea.web3j
import linea.domain.Constants
import org.web3j.crypto.Blob
fun padBlobForEip4844Submission(blob: ByteArray): ByteArray {
return ByteArray(Constants.Eip4844BlobSize).apply { blob.copyInto(this) }
}
fun ByteArray.toWeb3jTxBlob(): Blob = Blob(padBlobForEip4844Submission(this))
fun List<ByteArray>.toWeb3jTxBlob(): List<Blob> = map { it.toWeb3jTxBlob() }

View File

@@ -1,31 +1,40 @@
package linea.web3j
import net.consensys.linea.async.toSafeFuture
import org.web3j.protocol.core.RemoteFunctionCall
import org.web3j.protocol.core.Request
import org.web3j.protocol.core.Response
import tech.pegasys.teku.infrastructure.async.SafeFuture
fun <T> handleError(
response: Response<T>
): SafeFuture<T> {
fun <Resp> rejectOnJsonRpcError(
rpcMethod: String,
response: Resp
): SafeFuture<Resp>
where Resp : Response<*> {
return if (response.hasError()) {
SafeFuture.failedFuture(
RuntimeException(
"json-rpc error: code=${response.error.code} message=${response.error.message} " +
"data=${response.error.data}"
"$rpcMethod failed with JsonRpcError " +
"code=${response.error.code} message=${response.error.message} data=${response.error.data}"
)
)
} else {
SafeFuture.completedFuture(response.result)
SafeFuture.completedFuture(response)
}
}
fun <Resp, RespT, T> Request<*, Resp>.requestAsync(
mapperFn: (RespT) -> T
mapperFn: (Resp) -> T
): SafeFuture<T>
where Resp : Response<RespT> {
return this.sendAsync()
.thenCompose(::handleError)
.thenCompose { response -> rejectOnJsonRpcError(this.method, response) }
.toSafeFuture()
.thenApply(mapperFn)
}
fun <R, T> RemoteFunctionCall<R>.requestAsync(mapperFn: (R) -> T): SafeFuture<T> {
return sendAsync()
.toSafeFuture()
.thenApply(mapperFn)
}

View File

@@ -7,11 +7,9 @@ import linea.domain.EthLog
import linea.ethapi.EthApiClient
import linea.web3j.domain.toDomain
import linea.web3j.domain.toWeb3j
import linea.web3j.handleError
import linea.web3j.mapToDomainWithTxHashes
import linea.web3j.requestAsync
import linea.web3j.toDomain
import net.consensys.linea.async.toSafeFuture
import org.web3j.protocol.Web3j
import org.web3j.protocol.core.methods.request.EthFilter
import org.web3j.protocol.core.methods.response.Log
@@ -28,13 +26,13 @@ class Web3jEthApiClient(
override fun getBlockByNumber(blockParameter: BlockParameter): SafeFuture<Block?> {
return web3jClient
.ethGetBlockByNumber(blockParameter.toWeb3j(), true)
.requestAsync { block -> block?.toDomain() }
.requestAsync { resp -> resp.block?.toDomain() }
}
override fun getBlockByNumberWithoutTransactionsData(blockParameter: BlockParameter): SafeFuture<BlockWithTxHashes?> {
return web3jClient
.ethGetBlockByNumber(blockParameter.toWeb3j(), false)
.requestAsync { block -> block?.let(::mapToDomainWithTxHashes) }
.requestAsync { resp -> resp.block?.let(::mapToDomainWithTxHashes) }
}
override fun getLogs(
@@ -53,13 +51,10 @@ class Web3jEthApiClient(
return web3jClient
.ethGetLogs(ethFilter)
.sendAsync()
.toSafeFuture()
.thenCompose(::handleError)
.thenApply { logsResponse ->
if (logsResponse != null) {
.requestAsync { logsResponse ->
if (logsResponse.logs != null) {
@Suppress("UNCHECKED_CAST")
(logsResponse as List<org.web3j.protocol.core.methods.response.EthLog.LogResult<Log>>)
(logsResponse.logs as List<org.web3j.protocol.core.methods.response.EthLog.LogResult<Log>>)
.map { logResult -> logResult.get().toDomain() }
} else {
emptyList()

View File

@@ -75,7 +75,7 @@ class AsyncFriendlyTransactionManager : RawTransactionManager {
fromAddress,
blockParameter
)
.requestAsync { setNonce(it.drop(2).toBigInteger(16)) }
.requestAsync { setNonce(it.transactionCount) }
}
fun currentNonce(): BigInteger {