Tried improving the queue usage for the anchoring (#924)

* Tried improving the queue usage for the anchoring

* Removed useless log

* coordinator: message anchoring - fix test after queue refactor (#928)

* coordinator: message anchoring - fix test after small refactor ot use Deque

* Changed messagesToFetch to logsSoftLimit, changed test verifications a bit

* Changed messagesToFetch to logsSoftLimit, changed test verifications a bit

* Addressed a few more comments

---------

Co-authored-by: Fluent Crafter <205769460+fluentcrafter@users.noreply.github.com>
This commit is contained in:
Roman Vaseev
2025-05-06 08:18:47 +02:00
committed by GitHub
parent 004d459e77
commit a84d16dce3
7 changed files with 65 additions and 39 deletions

View File

@@ -10,10 +10,9 @@ import linea.domain.RetryConfig
import linea.ethapi.EthApiClient
import linea.ethapi.EthLogsSearcherImpl
import net.consensys.zkevm.LongRunningService
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import java.util.Deque
import java.util.concurrent.CompletableFuture
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.LinkedBlockingDeque
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
@@ -21,9 +20,8 @@ import kotlin.time.Duration.Companion.seconds
class MessageAnchoringApp(
private val vertx: Vertx,
private val config: Config,
private val l1EthApiClient: EthApiClient,
private val l2MessageService: L2MessageServiceSmartContractClient,
private val log: Logger = LogManager.getLogger(MessageAnchoringApp::class.java)
l1EthApiClient: EthApiClient,
private val l2MessageService: L2MessageServiceSmartContractClient
) : LongRunningService {
data class Config(
val l1RequestRetryConfig: RetryConfig,
@@ -45,7 +43,9 @@ class MessageAnchoringApp(
config = EthLogsSearcherImpl.Config(loopSuccessBackoffDelay = config.l1SuccessBackoffDelay)
)
private val eventsQueue = PriorityBlockingQueue<MessageSentEvent>(config.messageQueueCapacity.toInt())
private val eventsQueue: Deque<MessageSentEvent> = LinkedBlockingDeque()
internal val eventsQueueSize: Int
get() = eventsQueue.size
private val l1EventsPoller = run {
L1MessageSentEventsPoller(

View File

@@ -12,15 +12,15 @@ import net.consensys.zkevm.PeriodicPollingService
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.util.concurrent.PriorityBlockingQueue
import java.util.Queue
import kotlin.time.Duration
class MessageAnchoringService(
private val vertx: Vertx,
vertx: Vertx,
private val l1ContractAddress: String,
private val l1EthLogsClient: EthLogsClient,
private val l2MessageService: L2MessageServiceSmartContractClient,
private val eventsQueue: PriorityBlockingQueue<MessageSentEvent>,
private val eventsQueue: Queue<MessageSentEvent>,
private val maxMessagesToAnchorPerL2Transaction: UInt,
private val l2HighestBlockTag: BlockParameter,
anchoringTickInterval: Duration,
@@ -39,10 +39,7 @@ class MessageAnchoringService(
eventsQueue.removeIf { it.messageNumber <= lastAnchoredL1MessageNumber }
eventsQueue
.toArray(emptyArray<MessageSentEvent>())
.filter { it.messageNumber > lastAnchoredL1MessageNumber }
// needs sorting because PriorityBlockingQueue#toArray does not guarantee order
.sortedBy { it.messageNumber }
.toTypedArray()
.take(maxMessagesToAnchorPerL2Transaction.toInt())
}.thenCompose { eventsToAnchor ->
if (eventsToAnchor.isEmpty()) {

View File

@@ -41,7 +41,7 @@ internal class L1MessageSentEventsFetcher(
fun findL1MessageSentEvents(
startingMessageNumber: ULong,
messagesToFetch: UInt,
targetMessagesToFetch: UInt,
fetchTimeout: Duration,
blockChunkSize: UInt
): SafeFuture<List<EthLogEvent<MessageSentEvent>>> {
@@ -67,7 +67,7 @@ internal class L1MessageSentEventsFetcher(
),
chunkSize = blockChunkSize,
searchTimeout = fetchTimeout,
stopAfterTargetLogsCount = messagesToFetch
stopAfterTargetLogsCount = targetMessagesToFetch
).thenApply { result ->
lastSearch.set(LastSearch(result.endBlockNumber, startingMessageNumber))
val events = result.logs.map(MessageSentEvent::fromEthLog)

View File

@@ -9,7 +9,7 @@ import net.consensys.zkevm.PeriodicPollingService
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.util.concurrent.PriorityBlockingQueue
import java.util.Deque
import kotlin.time.Duration
class L1MessageSentEventsPoller(
@@ -17,7 +17,7 @@ class L1MessageSentEventsPoller(
pollingInterval: Duration,
private val l1SmartContractAddress: String,
private val l1EventsSearcher: EthLogsSearcher,
private val eventsQueue: PriorityBlockingQueue<MessageSentEvent>,
private val eventsQueue: Deque<MessageSentEvent>,
private val eventsQueueMaxCapacity: Int,
private val l2MessageService: L2MessageServiceSmartContractClient,
private val l1MessagesSentFetchLimit: UInt,
@@ -38,7 +38,7 @@ class L1MessageSentEventsPoller(
)
private fun nextMessageNumberToFetchFromL1(): SafeFuture<ULong> {
val queueLastMessage = eventsQueue.lastOrNull()
val queueLastMessage = eventsQueue.peekLast()
if (queueLastMessage != null) {
return SafeFuture.completedFuture(queueLastMessage.messageNumber.inc())
} else {
@@ -55,7 +55,11 @@ class L1MessageSentEventsPoller(
val remainingCapacity = queueRemainingCapacity()
if (remainingCapacity == 0) {
log.debug("MessageSent event queue is full, skipping fetching new events")
log.debug(
"skipping fetching MessageSent events: queueSize={} reached targetCapacity={}",
eventsQueue.size,
eventsQueueMaxCapacity
)
return SafeFuture.completedFuture(null)
}
@@ -63,7 +67,7 @@ class L1MessageSentEventsPoller(
.thenCompose { nextMessageNumberToFetchFromL1 ->
eventsFetcher.findL1MessageSentEvents(
startingMessageNumber = nextMessageNumberToFetchFromL1,
messagesToFetch = l1MessagesSentFetchLimit.coerceAtMost(remainingCapacity.toUInt()),
targetMessagesToFetch = l1MessagesSentFetchLimit.coerceAtMost(remainingCapacity.toUInt()),
fetchTimeout = l1MessagesSentFetchTimeout,
blockChunkSize = l1BlockSearchChuck
)

View File

@@ -211,7 +211,7 @@ class MessageAnchoringAppTest {
}
@Test
fun `should be resilient when queue gets full`() {
fun `should not exceed queue target capacity and be resilient when queue gets full`() {
// Worst case scenario: L1 block has more messages that the queue can handle,
// so it needs to query this block multiple times
val ethLogs = createL1MessageSentV1Logs(
@@ -220,19 +220,38 @@ class MessageAnchoringAppTest {
startingMessageNumber = 1UL
)
val messageQueueSoftCap = 20u
val l1PollingInterval = 1.milliseconds
val anchoringApp = createApp(
l1PollingInterval = 1.milliseconds,
l1PollingInterval = l1PollingInterval,
l1EventSearchBlockChunk = 10u,
l1EventPollingTimeout = 1.seconds,
messageQueueCapacity = 20u,
messageQueueCapacity = messageQueueSoftCap,
maxMessagesToAnchorPerL2Transaction = 50u,
anchoringTickInterval = 10.milliseconds
)
addLogsToFakeEthClient(ethLogs)
l1Client.setFinalizedBlockTag(ethLogs.last().messageSent.log.blockNumber + 10UL)
l2MessageService.forceAnchoringFailures = true
anchoringApp.start().get()
// wait for the queue to fill up
await()
.atMost(10.seconds.toJavaDuration())
.untilAsserted {
assertThat(anchoringApp.eventsQueueSize).isGreaterThanOrEqualTo(messageQueueSoftCap.toInt())
}
val filledEventsQueueSize = anchoringApp.eventsQueueSize
// ensure it has multiple event polling ticks but does
// not fetch more events that those already fetched
// in the 1st tick that filled the queue
Thread.sleep(l1PollingInterval.inWholeMilliseconds * 2L)
assertThat(anchoringApp.eventsQueueSize).isEqualTo(filledEventsQueueSize)
// enable anchoring again
l2MessageService.forceAnchoringFailures = false
await()
.atMost(10.seconds.toJavaDuration())
.untilAsserted {

View File

@@ -17,6 +17,9 @@ class FakeL2MessageService(
private var lastAnchoredL1MessageNumber: ULong = 0uL
private var lastAnchoredRollingHash: ByteArray = ByteArray(0)
@get:Synchronized @set:Synchronized
var forceAnchoringFailures: Boolean = false
override fun getAddress(): String = contractAddress
@Synchronized
@@ -48,6 +51,9 @@ class FakeL2MessageService(
"finalMessageNumber=$finalMessageNumber - startingMessageNumber=$startingMessageNumber + 1UL " +
"must be equal to messageHashes.size=${messageHashes.size}"
}
if (forceAnchoringFailures) {
return SafeFuture.failedFuture(RuntimeException("FakeL2MessageService: forced anchoring failure"))
}
lastAnchoredL1MessageNumber = finalMessageNumber
lastAnchoredRollingHash = finalRollingHash

View File

@@ -55,12 +55,12 @@ class EthLogsSearcherImpl(
)
} else {
findLogWithBinarySearch(
start,
end,
chunkSize,
address,
topics,
shallContinueToSearch
fromBlock = start,
toBlock = end,
chunkSize = chunkSize,
address = address,
topics = topics,
shallContinueToSearchPredicate = shallContinueToSearch
)
}
}
@@ -86,13 +86,13 @@ class EthLogsSearcherImpl(
)
} else {
getLogsLoopingForward(
start,
end,
address,
topics,
chunkSize,
searchTimeout,
stopAfterTargetLogsCount
fromBlock = start,
toBlock = end,
address = address,
topics = topics,
chunkSize = chunkSize,
searchTimeout = searchTimeout,
logsSoftLimit = stopAfterTargetLogsCount
)
}
}
@@ -105,7 +105,7 @@ class EthLogsSearcherImpl(
topics: List<String?>,
chunkSize: UInt,
searchTimeout: Duration,
logsLimit: UInt?
logsSoftLimit: UInt?
): SafeFuture<EthLogsSearcher.LogSearchResult> {
val cursor = ConsecutiveSearchCursor(fromBlock, toBlock, chunkSize.toInt(), SearchDirection.FORWARD)
@@ -117,7 +117,7 @@ class EthLogsSearcherImpl(
vertx,
backoffDelay = config.loopSuccessBackoffDelay,
stopRetriesPredicate = {
val enoughLogsCollected = logsCollected.size >= (logsLimit?.toInt() ?: Int.MAX_VALUE)
val enoughLogsCollected = logsCollected.size >= (logsSoftLimit?.toInt() ?: Int.MAX_VALUE)
val collectionTimeoutElapsed = (clock.now() - startTime) >= searchTimeout
val noMoreChunksToCollect = !cursor.hasNext()