Warn if consensus client doesn't call transition configuration endpoint within 120 seconds (#3569)

* add qos timer to engine_exchangeTransitionConfiguration

Signed-off-by: garyschulte <garyschulte@gmail.com>
Co-authored-by: Justin Florentine <justin+github@florentine.us>
This commit is contained in:
garyschulte
2022-03-12 12:03:20 -08:00
committed by GitHub
parent a2fbd76c3a
commit 3b5dafbfa0
6 changed files with 294 additions and 18 deletions

View File

@@ -580,7 +580,7 @@ public class RunnerBuilder {
Optional<JsonRpcHttpService> jsonRpcHttpService = Optional.empty();
Optional<JsonRpcHttpService> engineJsonRpcHttpService = Optional.empty();
if (jsonRpcConfiguration.isEnabled()) {
final Map<String, JsonRpcMethod> allJsonRpcMethods =
final Map<String, JsonRpcMethod> nonEngineMethods =
jsonRpcMethods(
protocolSchedule,
context,
@@ -592,7 +592,9 @@ public class RunnerBuilder {
miningCoordinator,
metricsSystem,
supportedCapabilities,
jsonRpcConfiguration.getRpcApis(),
jsonRpcConfiguration.getRpcApis().stream()
.filter(apiGroup -> !apiGroup.toLowerCase().startsWith("engine"))
.collect(Collectors.toList()),
filterManager,
accountLocalConfigPermissioningController,
nodeLocalConfigPermissioningController,
@@ -605,11 +607,6 @@ public class RunnerBuilder {
dataDir,
rpcEndpointServiceImpl);
final Map<String, JsonRpcMethod> nonEngineMethods =
allJsonRpcMethods.entrySet().stream()
.filter(entry -> !entry.getKey().toLowerCase().startsWith("engine"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
jsonRpcHttpService =
Optional.of(
new JsonRpcHttpService(
@@ -704,7 +701,7 @@ public class RunnerBuilder {
Optional<WebSocketService> webSocketService = Optional.empty();
Optional<WebSocketService> engineWebSocketService = Optional.empty();
if (webSocketConfiguration.isEnabled()) {
final Map<String, JsonRpcMethod> webSocketsJsonRpcMethods =
final Map<String, JsonRpcMethod> nonEngineMethods =
jsonRpcMethods(
protocolSchedule,
context,
@@ -716,7 +713,9 @@ public class RunnerBuilder {
miningCoordinator,
metricsSystem,
supportedCapabilities,
webSocketConfiguration.getRpcApis(),
webSocketConfiguration.getRpcApis().stream()
.filter(apiGroup -> !apiGroup.toLowerCase().startsWith("engine"))
.collect(Collectors.toList()),
filterManager,
accountLocalConfigPermissioningController,
nodeLocalConfigPermissioningController,
@@ -749,7 +748,7 @@ public class RunnerBuilder {
vertx,
webSocketConfiguration,
subscriptionManager,
webSocketsJsonRpcMethods,
nonEngineMethods,
privacyParameters,
protocolSchedule,
blockchainQueries,

View File

@@ -14,11 +14,11 @@
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.engine;
import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod.ENGINE_EXCHANGE_TRANSITION_CONFIGURATION;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.ExecutionEngineJsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.EngineExchangeTransitionConfigurationParameter;
@@ -26,8 +26,10 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcRespon
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.EngineExchangeTransitionConfigurationResult;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.util.QosTimer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
@@ -38,6 +40,17 @@ public class EngineExchangeTransitionConfiguration extends ExecutionEngineJsonRp
private static final Logger LOG =
LoggerFactory.getLogger(EngineExchangeTransitionConfiguration.class);
static final long QOS_TIMEOUT_MILLIS = 120000L;
private static final AtomicReference<QosTimer> qosTimerRef =
new AtomicReference<>(
new QosTimer(
QOS_TIMEOUT_MILLIS,
lastCall ->
LOG.warn(
"not called in {} seconds, consensus client may not be connected",
QOS_TIMEOUT_MILLIS / 1000L)));
public EngineExchangeTransitionConfiguration(
final Vertx vertx, final ProtocolContext protocolContext) {
super(vertx, protocolContext);
@@ -45,11 +58,14 @@ public class EngineExchangeTransitionConfiguration extends ExecutionEngineJsonRp
@Override
public String getName() {
return RpcMethod.ENGINE_EXCHANGE_TRANSITION_CONFIGURATION.getMethodName();
return ENGINE_EXCHANGE_TRANSITION_CONFIGURATION.getMethodName();
}
@Override
public JsonRpcResponse syncResponse(final JsonRpcRequestContext requestContext) {
// update our QoS "last call time"
getQosTimer().resetTimer();
final EngineExchangeTransitionConfigurationParameter remoteTransitionConfiguration =
requestContext.getRequiredParameter(
0, EngineExchangeTransitionConfigurationParameter.class);
@@ -102,4 +118,9 @@ public class EngineExchangeTransitionConfiguration extends ExecutionEngineJsonRp
final EngineExchangeTransitionConfigurationResult transitionConfiguration) {
return new JsonRpcSuccessResponse(requestId, transitionConfiguration);
}
// QosTimer accessor for testing considerations
QosTimer getQosTimer() {
return qosTimerRef.get();
}
}

View File

@@ -15,6 +15,10 @@
package org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.engine;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.consensus.merge.MergeContext;
@@ -37,29 +41,31 @@ import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.ParsedExtraData;
import org.hyperledger.besu.evm.log.LogsBloomFilter;
import org.hyperledger.besu.util.QosTimer;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
@RunWith(VertxUnitRunner.class)
public class EngineExchangeTransitionConfigurationTest {
private EngineExchangeTransitionConfiguration method;
private static final Vertx vertx = Vertx.vertx();
@Mock private ProtocolContext protocolContext;
@Mock private MergeContext mergeContext;
private final ProtocolContext protocolContext = mock(ProtocolContext.class);
private final MergeContext mergeContext = mock(MergeContext.class);
@Before
public void setUp() {
@@ -166,6 +172,99 @@ public class EngineExchangeTransitionConfigurationTest {
.isEqualTo("0x0000000000000000000000000000000000000000000000000000000000000000");
}
@Test
public void shouldWarnWhenExchangeConfigNotCalledWithinTimeout(final TestContext ctx) {
final long TEST_QOS_TIMEOUT = 75L;
final Async async = ctx.async();
final AtomicInteger logCounter = new AtomicInteger(0);
final var spyMethod = spy(method);
final var spyTimer = spy(new QosTimer(TEST_QOS_TIMEOUT, z -> logCounter.incrementAndGet()));
when(spyMethod.getQosTimer()).thenReturn(spyTimer);
spyTimer.resetTimer();
vertx.setTimer(
100L,
z -> {
try {
// just once on construction:
verify(spyTimer, times(1)).resetTimer();
} catch (Exception ex) {
ctx.fail(ex);
}
// assert one warn:
ctx.assertEquals(1, logCounter.get());
async.complete();
});
}
@Test
public void shouldNotWarnWhenTimerExecutesBeforeTimeout(final TestContext ctx) {
final long TEST_QOS_TIMEOUT = 500L;
final Async async = ctx.async();
final AtomicInteger logCounter = new AtomicInteger(0);
final var spyMethod = spy(method);
final var spyTimer = spy(new QosTimer(TEST_QOS_TIMEOUT, z -> logCounter.incrementAndGet()));
when(spyMethod.getQosTimer()).thenReturn(spyTimer);
spyTimer.resetTimer();
vertx.setTimer(
50L,
z -> {
try {
// just once on construction:
verify(spyTimer, times(1)).resetTimer();
} catch (Exception ex) {
ctx.fail(ex);
}
// should not warn
ctx.assertEquals(0, logCounter.get());
async.complete();
});
}
@Test
public void shouldNotWarnWhenExchangeConfigurationCalledWithinTimeout(final TestContext ctx) {
final long TEST_QOS_TIMEOUT = 75L;
final Async async = ctx.async();
final AtomicInteger logCounter = new AtomicInteger(0);
final var spyMethod = spy(method);
final var spyTimer = spy(new QosTimer(TEST_QOS_TIMEOUT, z -> logCounter.incrementAndGet()));
when(mergeContext.getTerminalPoWBlock()).thenReturn(Optional.empty());
when(mergeContext.getTerminalTotalDifficulty()).thenReturn(Difficulty.of(1337L));
when(spyMethod.getQosTimer()).thenReturn(spyTimer);
spyTimer.resetTimer();
// call exchangeTransitionConfiguration 50 milliseconds hence to reset our QoS timer
vertx.setTimer(
50L,
z ->
spyMethod.syncResponse(
new JsonRpcRequestContext(
new JsonRpcRequest(
"2.0",
RpcMethod.ENGINE_EXCHANGE_TRANSITION_CONFIGURATION.getMethodName(),
new Object[] {
new EngineExchangeTransitionConfigurationParameter(
"24",
Hash.fromHexStringLenient("0x01").toHexString(),
new UnsignedLongParameter(0))
}))));
vertx.setTimer(
100L,
z -> {
try {
// once on construction, once on call:
verify(spyTimer, times(2)).resetTimer();
} catch (Exception ex) {
ctx.fail(ex);
}
// should not warn
ctx.assertEquals(0, logCounter.get());
async.complete();
});
}
private JsonRpcResponse resp(final EngineExchangeTransitionConfigurationParameter param) {
return method.response(
new JsonRpcRequestContext(

View File

@@ -41,6 +41,7 @@ dependencies {
implementation 'org.xerial.snappy:snappy-java'
testImplementation 'junit:junit'
testImplementation 'io.vertx:vertx-unit'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.mockito:mockito-core'
}

View File

@@ -0,0 +1,62 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.util;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
public class QosTimer {
private final Vertx timerVertx = Vertx.vertx();
private final AtomicLong timerId = new AtomicLong(Long.MAX_VALUE);
private final AtomicLong lastReset = new AtomicLong(System.currentTimeMillis());
private final long periodMillis;
private final Consumer<Long> consumerTask;
public QosTimer(final long periodMillis, final Consumer<Long> consumerTask) {
this.periodMillis = periodMillis;
this.consumerTask = consumerTask;
resetTimer();
}
public void resetTimer() {
lastReset.set(System.currentTimeMillis());
resetTimerHandler(timerHandler());
}
void resetTimerHandler(final Handler<Long> timerHandler) {
timerVertx.cancelTimer(timerId.get());
timerId.set(timerVertx.setTimer(periodMillis, timerHandler));
}
Handler<Long> timerHandler() {
return z -> {
var lastCall = getLastCallMillis();
var now = System.currentTimeMillis();
if (lastCall + periodMillis < now) {
consumerTask.accept(lastCall);
}
resetTimerHandler(timerHandler());
};
}
long getLastCallMillis() {
return lastReset.get();
}
}

View File

@@ -0,0 +1,94 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.util;
import java.util.concurrent.atomic.AtomicInteger;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(VertxUnitRunner.class)
public class QosTimerTest {
static Vertx vertx = Vertx.vertx();
@Ignore("fails on CI with short timeouts and don't want to slow test suite down with longer ones")
@Test
public void shouldExecuteConsecutivelyAtTimeout(final TestContext ctx) {
final long TEST_QOS_TIMEOUT = 100L;
final Async async = ctx.async();
final AtomicInteger execCount = new AtomicInteger(0);
new QosTimer(TEST_QOS_TIMEOUT, z -> execCount.incrementAndGet());
vertx.setTimer(
250L,
z -> {
ctx.assertEquals(2, execCount.get());
async.complete();
});
}
@Test
public void shouldExecuteOnceAtTimeout(final TestContext ctx) {
final long TEST_QOS_TIMEOUT = 75L;
final Async async = ctx.async();
final AtomicInteger execCount = new AtomicInteger(0);
new QosTimer(TEST_QOS_TIMEOUT, z -> execCount.incrementAndGet());
vertx.setTimer(
100L,
z -> {
ctx.assertEquals(1, execCount.get());
async.complete();
});
}
@Test
public void shouldNotExecuteBeforeTimeout(final TestContext ctx) {
final long TEST_QOS_TIMEOUT = 200L;
final Async async = ctx.async();
final AtomicInteger execCount = new AtomicInteger(0);
new QosTimer(TEST_QOS_TIMEOUT, z -> execCount.incrementAndGet());
vertx.setTimer(
50L,
z -> {
ctx.assertEquals(0, execCount.get());
async.complete();
});
}
@Test
public void shouldNotExecuteWhenReset(final TestContext ctx) {
final long TEST_QOS_TIMEOUT = 50L;
final Async async = ctx.async();
final AtomicInteger execCount = new AtomicInteger(0);
final var timer = new QosTimer(TEST_QOS_TIMEOUT, z -> execCount.incrementAndGet());
// reset QoS timer every 25 millis
vertx.setPeriodic(25L, z -> timer.resetTimer());
vertx.setTimer(
200L,
z -> {
ctx.assertEquals(0, execCount.get());
async.complete();
});
}
}