mirror of
https://github.com/vacp2p/linea-monorepo.git
synced 2026-01-09 15:38:06 -05:00
small improvements: error found by Kotlin 2 compiler (#250)
* small improvements: error found by Kotlin 2 compiler * fix flacky test
This commit is contained in:
@@ -6,7 +6,11 @@ class SimpleCompositeSafeFutureHandler<T>(
|
||||
private val handlers: List<(T) -> SafeFuture<*>>
|
||||
) : (T) -> SafeFuture<*> {
|
||||
override fun invoke(arg: T): SafeFuture<Unit> {
|
||||
val handlingFutures = handlers.map { it.invoke(arg) }
|
||||
val handlingFutures =
|
||||
handlers.map {
|
||||
kotlin.runCatching { it.invoke(arg) }
|
||||
.getOrElse { SafeFuture.failedFuture<T>(it) }
|
||||
}
|
||||
return SafeFuture.allOf(*handlingFutures.toTypedArray()).thenApply { }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,27 +1,47 @@
|
||||
package net.consensys.zkevm.ethereum.coordination.common
|
||||
|
||||
import net.consensys.zkevm.ethereum.coordination.SimpleCompositeSafeFutureHandler
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.mockito.ArgumentMatchers.eq
|
||||
import org.mockito.Mockito.spy
|
||||
import org.mockito.Mockito.times
|
||||
import org.mockito.kotlin.verify
|
||||
import tech.pegasys.teku.infrastructure.async.SafeFuture
|
||||
|
||||
class SimpleCompositeSafeFutureHandlerTest {
|
||||
@Test
|
||||
fun `SimpleCompositeSafeFutureHandler propagates argument to all the handlers exactly once`() {
|
||||
val testHandler = { _: Long -> SafeFuture.completedFuture(Unit) }
|
||||
fun `should propagate argument to all the handlers exactly once`() {
|
||||
val handler1Calls = mutableListOf<String>()
|
||||
val handler2Calls = mutableListOf<String>()
|
||||
val handler3Calls = mutableListOf<String>()
|
||||
val handlers = listOf(
|
||||
spy(testHandler),
|
||||
spy(testHandler)
|
||||
{ value: Long -> SafeFuture.completedFuture(handler1Calls.add("handler1:$value")) },
|
||||
{ value: Long ->
|
||||
handler2Calls.add("handler2:$value")
|
||||
SafeFuture.failedFuture(RuntimeException("Handler 2 failed"))
|
||||
},
|
||||
{ value: Long -> SafeFuture.completedFuture(handler3Calls.add("handler3:$value")) }
|
||||
)
|
||||
|
||||
val expectedArgument = 13L
|
||||
SimpleCompositeSafeFutureHandler(handlers).invoke(expectedArgument)
|
||||
SimpleCompositeSafeFutureHandler(handlers).invoke(123)
|
||||
|
||||
handlers.forEach {
|
||||
verify(it, times(1)).invoke(eq(expectedArgument))
|
||||
}
|
||||
assertThat(handler1Calls).containsExactly("handler1:123")
|
||||
assertThat(handler2Calls).containsExactly("handler2:123")
|
||||
assertThat(handler3Calls).containsExactly("handler3:123")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should be resilient_or_not_to handler failure`() {
|
||||
val handler1Calls = mutableListOf<String>()
|
||||
val handler2Calls = mutableListOf<String>()
|
||||
val handler3Calls = mutableListOf<String>()
|
||||
val handlers = listOf(
|
||||
{ value: Long -> SafeFuture.completedFuture(handler1Calls.add("handler1:$value")) },
|
||||
{ value: Long -> handler2Calls.add("handler2:$value"); throw RuntimeException("Forced error") },
|
||||
{ value: Long -> SafeFuture.completedFuture(handler3Calls.add("handler3:$value")) }
|
||||
)
|
||||
|
||||
SimpleCompositeSafeFutureHandler(handlers).invoke(123)
|
||||
|
||||
assertThat(handler1Calls).containsExactly("handler1:123")
|
||||
assertThat(handler2Calls).containsExactly("handler2:123")
|
||||
assertThat(handler3Calls).containsExactly("handler3:123")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,9 +17,9 @@ class L1ShnarfBasedAlreadySubmittedBlobsFilter(
|
||||
* if blobRecords=[b1, b2, b3, b4, b5, b6] the result will be [b4, b5, b6]
|
||||
*/
|
||||
override fun invoke(
|
||||
blobRecords: List<BlobRecord>
|
||||
items: List<BlobRecord>
|
||||
): SafeFuture<List<BlobRecord>> {
|
||||
val blockByShnarfQueryFutures = blobRecords.map { blobRecord ->
|
||||
val blockByShnarfQueryFutures = items.map { blobRecord ->
|
||||
lineaRollup
|
||||
.isBlobShnarfPresent(shnarf = blobRecord.expectedShnarf)
|
||||
.thenApply { isShnarfPresent ->
|
||||
@@ -39,8 +39,8 @@ class L1ShnarfBasedAlreadySubmittedBlobsFilter(
|
||||
}
|
||||
.thenApply { highestBlobEndBlockNumberFoundInL1 ->
|
||||
highestBlobEndBlockNumberFoundInL1
|
||||
?.let { blockNumber -> blobRecords.filter { it.startBlockNumber > blockNumber } }
|
||||
?: blobRecords
|
||||
?.let { blockNumber -> items.filter { it.startBlockNumber > blockNumber } }
|
||||
?: items
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -384,7 +384,7 @@ class BlobCompressionProofCoordinatorIntTest : CleanDbTestSuiteParallel() {
|
||||
blobZkStateCount += 1
|
||||
if (blobZkStateFailures <= maxMockedBlobZkStateFailures && blobZkStateCount % 2 == 0) {
|
||||
blobZkStateFailures += 1
|
||||
SafeFuture.failedFuture(RuntimeException("Forced mock blobZkStateProvider failure"))
|
||||
SafeFuture.failedFuture<BlobZkState>(RuntimeException("Forced mock blobZkStateProvider failure"))
|
||||
} else {
|
||||
SafeFuture.completedFuture(
|
||||
BlobZkState(
|
||||
|
||||
@@ -11,7 +11,7 @@ hoplite = "2.7.5"
|
||||
jackson = "2.18.0"
|
||||
jna = "5.14.0"
|
||||
junit = "5.10.1"
|
||||
kotlinxDatetime = "0.4.0"
|
||||
kotlinxDatetime = "0.6.1"
|
||||
ktlint = "0.47.0"
|
||||
log4j = "2.20.0"
|
||||
micrometer = "1.8.4"
|
||||
|
||||
@@ -2,9 +2,11 @@ package net.consensys.linea.async
|
||||
|
||||
import tech.pegasys.teku.infrastructure.async.SafeFuture
|
||||
|
||||
fun interface AsyncFilter<T> : (List<T>) -> SafeFuture<List<T>> {
|
||||
fun interface AsyncFilter<T> {
|
||||
fun invoke(items: List<T>): SafeFuture<List<T>>
|
||||
|
||||
fun then(next: AsyncFilter<T>): AsyncFilter<T> = AsyncFilter { items ->
|
||||
this(items).thenCompose(next)
|
||||
this.invoke(items).thenCompose(next::invoke)
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
@@ -261,26 +261,24 @@ class AsyncRetryerTest {
|
||||
|
||||
@Test
|
||||
fun `retry should stop after timeout is elapsed - promise resolves, predicate evaluation`(vertx: Vertx) {
|
||||
var callCount = AtomicInteger(0)
|
||||
val callCount = AtomicInteger(0)
|
||||
val future =
|
||||
AsyncRetryer.retry(
|
||||
vertx,
|
||||
20.milliseconds,
|
||||
timeout = 40.milliseconds,
|
||||
stopRetriesPredicate = { result -> result == "20" }
|
||||
backoffDelay = 5.milliseconds,
|
||||
timeout = 60.milliseconds,
|
||||
stopRetriesPredicate = { false } // stop condition will never be met
|
||||
) {
|
||||
SafeFuture.completedFuture("${callCount.incrementAndGet()}")
|
||||
}
|
||||
runCatching { future.get() }
|
||||
assertThat(future)
|
||||
.isCompletedExceptionally
|
||||
.isNotCancelled
|
||||
.failsWithin(2.seconds.toJavaDuration())
|
||||
.withThrowableOfType(ExecutionException::class.java)
|
||||
.withCauseInstanceOf(RetriedExecutionException::class.java)
|
||||
.withMessageContaining("Stop condition wasn't met after timeout of 40ms.")
|
||||
.withMessageContaining("Stop condition wasn't met after timeout of 60ms")
|
||||
|
||||
assertThat(callCount.get()).isGreaterThan(1).isLessThan(4)
|
||||
// 60ms / 5ms = 12 retries
|
||||
assertThat(callCount.get()).isBetween(1, 13)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -127,7 +127,7 @@ class EthConnectionImpl(url: String?) : EthConnection {
|
||||
value.stream().sorted { s: TransactionDetail, t: TransactionDetail -> 1 * s.nonce.compareTo(t.nonce) }
|
||||
.toList()
|
||||
if (sorted.size == 0) {
|
||||
return@Function key.initialNonce
|
||||
return@Function key.initialNonce!!
|
||||
} else {
|
||||
sendAllTransactions(sorted[0], sorted.subList(1, sorted.size))
|
||||
}
|
||||
|
||||
@@ -6,5 +6,5 @@ plugins {
|
||||
dependencies {
|
||||
implementation project(":jvm-libs:generic:extensions:kotlin")
|
||||
|
||||
testFixtures implementation(testFixtures(project(":jvm-libs:generic:extensions:kotlin")))
|
||||
testFixturesImplementation(testFixtures(project(":jvm-libs:generic:extensions:kotlin")))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user