feat: add request timeout for conflated trace, trace counter, and sta… (#1125)

* feat: add request timeout for conflated trace, trace counter, and state manager requests

* fix: requestTimeout for TracesGeneratorJsonRpcClientV2
This commit is contained in:
jonesho
2025-06-20 02:42:10 +08:00
committed by GitHub
parent c074814d30
commit 0c1649788e
11 changed files with 39 additions and 0 deletions

View File

@@ -88,6 +88,7 @@ failures-warning-threshold = 10
[traces.conflation]
endpoints = ["http://traces-node:8545/"]
request-limit-per-endpoint = 1
request-timeout = "PT60S" # each conflated trace request would be timeout in 60 seconds
[traces.conflation.request-retries]
backoff-delay = "PT1S"
failures-warning-threshold = 10

View File

@@ -2,12 +2,14 @@ package linea.coordinator.config.v2
import linea.domain.RetryConfig
import java.net.URL
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
data class StateManagerConfig(
val version: String,
val endpoints: List<URL>,
val requestLimitPerEndpoint: UInt = UInt.MAX_VALUE,
val requestTimeout: Duration? = null,
val requestRetries: RetryConfig = RetryConfig.endlessRetry(
backoffDelay = 1.seconds,
failuresWarningThreshold = 3u,

View File

@@ -2,6 +2,7 @@ package linea.coordinator.config.v2
import linea.domain.RetryConfig
import java.net.URL
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
data class TracesConfig(
@@ -14,6 +15,7 @@ data class TracesConfig(
data class ClientApiConfig(
val endpoints: List<URL>,
val requestLimitPerEndpoint: UInt = 100u,
val requestTimeout: Duration? = null,
val requestRetries: RetryConfig = RetryConfig.endlessRetry(
backoffDelay = 1.seconds,
failuresWarningThreshold = 3u,

View File

@@ -2,12 +2,14 @@ package linea.coordinator.config.v2.toml
import linea.coordinator.config.v2.StateManagerConfig
import java.net.URL
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
data class StateManagerToml(
val version: String,
val endpoints: List<URL>,
val requestLimitPerEndpoint: UInt = UInt.MAX_VALUE,
val requestTimeout: Duration? = null,
val requestRetries: RequestRetriesToml = RequestRetriesToml.endlessRetry(
backoffDelay = 1.seconds,
failuresWarningThreshold = 3u,
@@ -18,6 +20,7 @@ data class StateManagerToml(
version = this.version,
endpoints = this.endpoints,
requestLimitPerEndpoint = this.requestLimitPerEndpoint,
requestTimeout = this.requestTimeout,
requestRetries = this.requestRetries.asDomain,
)
}

View File

@@ -2,6 +2,7 @@ package linea.coordinator.config.v2.toml
import linea.coordinator.config.v2.TracesConfig
import java.net.URL
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
data class TracesToml(
@@ -14,6 +15,7 @@ data class TracesToml(
data class ClientApiConfigToml(
val endpoints: List<URL>,
val requestLimitPerEndpoint: UInt = UInt.MAX_VALUE,
val requestTimeout: Duration? = null,
val requestRetries: RequestRetriesToml = RequestRetriesToml.endlessRetry(
backoffDelay = 1.seconds,
failuresWarningThreshold = 3u,
@@ -23,6 +25,7 @@ data class TracesToml(
return "ClientApiConfigToml(" +
"endpoints=$endpoints, " +
"requestLimitPerEndpoint=$requestLimitPerEndpoint, " +
"requestTimeout=$requestTimeout, " +
"requestRetries=$requestRetries" +
")"
}
@@ -34,11 +37,13 @@ data class TracesToml(
counters = TracesConfig.ClientApiConfig(
endpoints = counters.endpoints,
requestLimitPerEndpoint = counters.requestLimitPerEndpoint,
requestTimeout = counters.requestTimeout,
requestRetries = counters.requestRetries.asDomain,
),
conflation = TracesConfig.ClientApiConfig(
endpoints = conflation.endpoints,
requestLimitPerEndpoint = conflation.requestLimitPerEndpoint,
requestTimeout = conflation.requestTimeout,
requestRetries = conflation.requestRetries.asDomain,
),
/*

View File

@@ -137,6 +137,7 @@ class ConflationApp(
endpoints = configs.stateManager.endpoints.map { it.toURI() },
maxInflightRequestsPerClient = configs.stateManager.requestLimitPerEndpoint,
requestRetry = configs.stateManager.requestRetries.toJsonRpcRetry(),
requestTimeout = configs.stateManager.requestTimeout?.inWholeMilliseconds,
zkStateManagerVersion = configs.stateManager.version,
logger = LogManager.getLogger("clients.StateManagerShomeiClient"),
)
@@ -273,6 +274,7 @@ class ConflationApp(
rpcClient = httpJsonRpcClientFactory.createWithLoadBalancing(
endpoints = configs.traces.counters.endpoints.toSet(),
maxInflightRequestsPerClient = configs.traces.counters.requestLimitPerEndpoint,
requestTimeout = configs.traces.counters.requestTimeout?.inWholeMilliseconds,
log = tracesCountersLog,
),
config = TracesGeneratorJsonRpcClientV2.Config(
@@ -290,6 +292,7 @@ class ConflationApp(
rpcClient = httpJsonRpcClientFactory.createWithLoadBalancing(
endpoints = configs.traces.conflation.endpoints.toSet(),
maxInflightRequestsPerClient = configs.traces.conflation.requestLimitPerEndpoint,
requestTimeout = configs.traces.conflation.requestTimeout?.inWholeMilliseconds,
log = tracesConflationLog,
),
config = TracesGeneratorJsonRpcClientV2.Config(

View File

@@ -15,6 +15,7 @@ class StateManagerParsingTest {
version = "2.2.0"
endpoints = ["http://shomei:8888/"]
request-limit-per-endpoint = 3
request-timeout = "PT30S"
[state-manager.request-retries]
max-retries = 5
backoff-delay = "PT2S"
@@ -25,6 +26,7 @@ class StateManagerParsingTest {
version = "2.2.0",
endpoints = listOf("http://shomei:8888/".toURL()),
requestLimitPerEndpoint = 3u,
requestTimeout = 30.seconds,
requestRetries = RequestRetriesToml(
maxRetries = 5u,
backoffDelay = 2.seconds,
@@ -42,6 +44,7 @@ class StateManagerParsingTest {
version = "2.2.0",
endpoints = listOf("http://shomei:8888/".toURL()),
requestLimitPerEndpoint = UInt.MAX_VALUE,
requestTimeout = null,
requestRetries = RequestRetriesToml(
maxRetries = null,
backoffDelay = 1.seconds,

View File

@@ -25,6 +25,7 @@ class TracesParsingTest {
[traces.conflation]
endpoints = ["http://traces-api-2:8080/"]
request-limit-per-endpoint = 2
request-timeout = "PT60S"
[traces.conflation.request-retries]
max-retries = 30
backoff-delay = "PT3S"
@@ -64,6 +65,7 @@ class TracesParsingTest {
conflation = TracesToml.ClientApiConfigToml(
endpoints = listOf(URI.create("http://traces-api-2:8080/").toURL()),
requestLimitPerEndpoint = 2u,
requestTimeout = 60.seconds,
requestRetries = RequestRetriesToml(
maxRetries = 30u,
backoffDelay = 3.seconds,
@@ -108,6 +110,7 @@ class TracesParsingTest {
counters = TracesToml.ClientApiConfigToml(
endpoints = listOf(URI.create("http://traces-api-1:8080/").toURL()),
requestLimitPerEndpoint = UInt.MAX_VALUE,
requestTimeout = null,
requestRetries = RequestRetriesToml(
maxRetries = null,
backoffDelay = 1.seconds,
@@ -117,6 +120,7 @@ class TracesParsingTest {
conflation = TracesToml.ClientApiConfigToml(
endpoints = listOf(URI.create("http://traces-api-2:8080/").toURL()),
requestLimitPerEndpoint = UInt.MAX_VALUE,
requestTimeout = null,
requestRetries = RequestRetriesToml(
maxRetries = null,
backoffDelay = 1.seconds,

View File

@@ -37,6 +37,7 @@ class VertxHttpJsonRpcClient(
private val responseObjectMapper: ObjectMapper = objectMapper,
private val log: Logger = LogManager.getLogger(VertxHttpJsonRpcClient::class.java),
private val requestResponseLogLevel: Level = Level.TRACE,
private val requestTimeout: Long? = null,
private val failuresLogLevel: Level = Level.DEBUG,
private val metricsCategory: MetricsCategory = object : MetricsCategory {
override val name: String = "jsonrpc"
@@ -45,6 +46,7 @@ class VertxHttpJsonRpcClient(
private val requestOptions = RequestOptions().apply {
setMethod(HttpMethod.POST)
setAbsoluteURI(endpoint)
requestTimeout?.let { setTimeout(it) }
}
private fun serializeRequest(request: JsonRpcRequest): String {

View File

@@ -27,6 +27,7 @@ interface JsonRpcClientFactory {
httpVersion: HttpVersion? = null,
requestObjectMapper: ObjectMapper = objectMapper,
responseObjectMapper: ObjectMapper = objectMapper,
requestTimeout: Long? = null,
shallRetryRequestsClientBasePredicate: Predicate<Result<Any?, Throwable>> = Predicate { it is Err },
log: Logger = LogManager.getLogger(VertxHttpJsonRpcClient::class.java),
requestResponseLogLevel: Level = Level.TRACE,
@@ -47,6 +48,7 @@ class VertxHttpJsonRpcClientFactory(
httpVersion: HttpVersion? = null,
requestObjectMapper: ObjectMapper = objectMapper,
responseObjectMapper: ObjectMapper = objectMapper,
requestTimeout: Long? = null,
log: Logger = LogManager.getLogger(VertxHttpJsonRpcClient::class.java),
requestResponseLogLevel: Level = this.requestResponseLogLevel,
failuresLogLevel: Level = this.failuresLogLevel,
@@ -66,6 +68,7 @@ class VertxHttpJsonRpcClientFactory(
log = log,
requestParamsObjectMapper = requestObjectMapper,
responseObjectMapper = responseObjectMapper,
requestTimeout = requestTimeout,
requestResponseLogLevel = requestResponseLogLevel,
failuresLogLevel = failuresLogLevel,
)
@@ -77,6 +80,7 @@ class VertxHttpJsonRpcClientFactory(
httpVersion: HttpVersion? = null,
requestObjectMapper: ObjectMapper = objectMapper,
responseObjectMapper: ObjectMapper = objectMapper,
requestTimeout: Long? = null,
log: Logger = LogManager.getLogger(VertxHttpJsonRpcClient::class.java),
requestResponseLogLevel: Level = this.requestResponseLogLevel,
failuresLogLevel: Level = this.failuresLogLevel,
@@ -89,6 +93,7 @@ class VertxHttpJsonRpcClientFactory(
httpVersion = httpVersion,
requestObjectMapper = requestObjectMapper,
responseObjectMapper = responseObjectMapper,
requestTimeout = requestTimeout,
log = log,
requestResponseLogLevel = requestResponseLogLevel,
failuresLogLevel = failuresLogLevel,
@@ -106,6 +111,7 @@ class VertxHttpJsonRpcClientFactory(
httpVersion: HttpVersion? = null,
requestObjectMapper: ObjectMapper = objectMapper,
responseObjectMapper: ObjectMapper = objectMapper,
requestTimeout: Long? = null,
log: Logger = LogManager.getLogger(VertxHttpJsonRpcClient::class.java),
requestResponseLogLevel: Level = this.requestResponseLogLevel,
failuresLogLevel: Level = this.failuresLogLevel,
@@ -116,6 +122,7 @@ class VertxHttpJsonRpcClientFactory(
httpVersion = httpVersion,
requestObjectMapper = requestObjectMapper,
responseObjectMapper = responseObjectMapper,
requestTimeout = requestTimeout,
log = log,
requestResponseLogLevel = requestResponseLogLevel,
failuresLogLevel = failuresLogLevel,
@@ -141,6 +148,7 @@ class VertxHttpJsonRpcClientFactory(
httpVersion: HttpVersion? = null,
requestObjectMapper: ObjectMapper = objectMapper,
responseObjectMapper: ObjectMapper = objectMapper,
requestTimeout: Long? = null,
log: Logger = LogManager.getLogger(VertxHttpJsonRpcClient::class.java),
requestResponseLogLevel: Level = this.requestResponseLogLevel,
failuresLogLevel: Level = this.failuresLogLevel,
@@ -151,6 +159,7 @@ class VertxHttpJsonRpcClientFactory(
httpVersion = httpVersion,
requestObjectMapper = requestObjectMapper,
responseObjectMapper = responseObjectMapper,
requestTimeout = requestTimeout,
log = log,
requestResponseLogLevel = requestResponseLogLevel,
failuresLogLevel = failuresLogLevel,
@@ -176,6 +185,7 @@ class VertxHttpJsonRpcClientFactory(
httpVersion: HttpVersion?,
requestObjectMapper: ObjectMapper,
responseObjectMapper: ObjectMapper,
requestTimeout: Long?,
shallRetryRequestsClientBasePredicate: Predicate<Result<Any?, Throwable>>,
log: Logger,
requestResponseLogLevel: Level,
@@ -194,6 +204,7 @@ class VertxHttpJsonRpcClientFactory(
httpVersion = httpVersion,
requestObjectMapper = requestObjectMapper,
responseObjectMapper = responseObjectMapper,
requestTimeout = requestTimeout,
log = log,
requestResponseLogLevel = requestResponseLogLevel,
failuresLogLevel = failuresLogLevel,
@@ -204,6 +215,7 @@ class VertxHttpJsonRpcClientFactory(
httpVersion = httpVersion,
requestObjectMapper = requestObjectMapper,
responseObjectMapper = responseObjectMapper,
requestTimeout = requestTimeout,
log = log,
requestResponseLogLevel = requestResponseLogLevel,
failuresLogLevel = failuresLogLevel,

View File

@@ -31,6 +31,7 @@ class StateManagerV1JsonRpcClient(
endpoints: List<URI>,
maxInflightRequestsPerClient: UInt,
requestRetry: RequestRetryConfig,
requestTimeout: Long? = null,
zkStateManagerVersion: String,
logger: Logger = LogManager.getLogger(StateManagerV1JsonRpcClient::class.java),
): StateManagerV1JsonRpcClient {
@@ -39,6 +40,7 @@ class StateManagerV1JsonRpcClient(
endpoints = endpoints,
maxInflightRequestsPerClient = maxInflightRequestsPerClient,
retryConfig = requestRetry,
requestTimeout = requestTimeout,
log = logger,
shallRetryRequestsClientBasePredicate = { it is Err },
),