Otel take 2 (#4075)

* Revert "Revert "Upgrade OpenTelemetry (#3675)" (#4031)"

This reverts commit 17de636fe2.

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* Make sure we don't initialize the OpenTelemetry global singleton by mistake

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* disable global otel singleton explicitly

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* make sure to set GlobalOpenTelemetry at most once to avoid test failures

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* reset for tests

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* fix changelog

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>
This commit is contained in:
Antoine Toulme
2022-11-14 01:01:43 -08:00
committed by GitHub
parent 3406f06fcf
commit 5b462af0a7
30 changed files with 458 additions and 250 deletions

View File

@@ -10,6 +10,7 @@
- Improve performance of block processing by parallelizing some parts during the "commit" step [#4635](https://github.com/hyperledger/besu/pull/4635)
- Upgrade RocksDB version from 7.6.0 to 7.7.3
- Added new RPC endpoints `debug_setHead` & `debug_replayBlock [4580](https://github.com/hyperledger/besu/pull/4580)
- Upgrade OpenTelemetry to version 1.19.0 [#3675](https://github.com/hyperledger/besu/pull/3675)
### Bug Fixes
@@ -65,6 +66,7 @@
https://hyperledger.jfrog.io/hyperledger/besu-binaries/besu/22.10.0/besu-22.10.0.tar.gz / sha256: 18590796831a6c6c2ca17ba8e6877dd2bd63c25e034f1bbc987aaa0a9c3a178e
https://hyperledger.jfrog.io/hyperledger/besu-binaries/besu/22.10.0/besu-22.10.0.zip / sha256: 8ad4927469e8e128a3a2c7a708f108393eebc82c522f66cdcac4b7d206e07f90
## 22.10.0-RC2
### Breaking Changes

View File

@@ -33,6 +33,7 @@ dependencies {
implementation 'io.reactivex.rxjava2:rxjava'
implementation 'io.vertx:vertx-core'
implementation 'junit:junit'
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'org.apache.tuweni:tuweni-bytes'
implementation 'org.apache.tuweni:tuweni-io'
implementation 'org.apache.tuweni:tuweni-units'

View File

@@ -127,6 +127,7 @@ public class BesuNode implements NodeConfiguration, RunnableNode, AutoCloseable
private Optional<Integer> exitCode = Optional.empty();
private Optional<PkiKeyStoreConfiguration> pkiKeyStoreConfiguration = Optional.empty();
private final boolean isStrictTxReplayProtectionEnabled;
private final Map<String, String> environment;
public BesuNode(
final String name,
@@ -159,7 +160,8 @@ public class BesuNode implements NodeConfiguration, RunnableNode, AutoCloseable
final List<String> runCommand,
final Optional<KeyPair> keyPair,
final Optional<PkiKeyStoreConfiguration> pkiKeyStoreConfiguration,
final boolean isStrictTxReplayProtectionEnabled)
final boolean isStrictTxReplayProtectionEnabled,
final Map<String, String> environment)
throws IOException {
this.homeDirectory = dataPath.orElseGet(BesuNode::createTmpDataDirectory);
this.isStrictTxReplayProtectionEnabled = isStrictTxReplayProtectionEnabled;
@@ -216,6 +218,7 @@ public class BesuNode implements NodeConfiguration, RunnableNode, AutoCloseable
this.isDnsEnabled = isDnsEnabled;
privacyParameters.ifPresent(this::setPrivacyParameters);
this.pkiKeyStoreConfiguration = pkiKeyStoreConfiguration;
this.environment = environment;
LOG.info("Created BesuNode {}", this);
}
@@ -794,4 +797,9 @@ public class BesuNode implements NodeConfiguration, RunnableNode, AutoCloseable
public void setExitCode(final int exitValue) {
this.exitCode = Optional.of(exitValue);
}
@Override
public Map<String, String> getEnvironment() {
return environment;
}
}

View File

@@ -419,6 +419,8 @@ public class ProcessBesuNodeRunner implements BesuNodeRunner {
"JAVA_OPTS",
"-Djava.security.properties="
+ "acceptance-tests/tests/build/resources/test/acceptanceTesting.security");
// add additional environment variables
processBuilder.environment().putAll(node.getEnvironment());
try {
checkState(
isNotAliveOrphan(node.getName()),

View File

@@ -65,6 +65,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,6 +134,7 @@ public class ThreadBesuNodeRunner implements BesuNodeRunner {
buildPluginContext(
node, storageService, securityModuleService, commonPluginConfiguration));
GlobalOpenTelemetry.resetForTest();
final ObservableMetricsSystem metricsSystem =
MetricsSystemFactory.create(node.getMetricsConfiguration());
final List<EnodeURL> bootnodes =

View File

@@ -30,6 +30,7 @@ import org.hyperledger.besu.tests.acceptance.dsl.node.configuration.genesis.Gene
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class BesuNodeConfiguration {
@@ -65,6 +66,7 @@ public class BesuNodeConfiguration {
private final Optional<KeyPair> keyPair;
private final Optional<PkiKeyStoreConfiguration> pkiKeyStoreConfiguration;
private final boolean strictTxReplayProtectionEnabled;
private final Map<String, String> environment;
BesuNodeConfiguration(
final String name,
@@ -97,7 +99,8 @@ public class BesuNodeConfiguration {
final List<String> runCommand,
final Optional<KeyPair> keyPair,
final Optional<PkiKeyStoreConfiguration> pkiKeyStoreConfiguration,
final boolean strictTxReplayProtectionEnabled) {
final boolean strictTxReplayProtectionEnabled,
final Map<String, String> environment) {
this.name = name;
this.miningParameters = miningParameters;
this.jsonRpcConfiguration = jsonRpcConfiguration;
@@ -129,6 +132,7 @@ public class BesuNodeConfiguration {
this.keyPair = keyPair;
this.pkiKeyStoreConfiguration = pkiKeyStoreConfiguration;
this.strictTxReplayProtectionEnabled = strictTxReplayProtectionEnabled;
this.environment = environment;
}
public String getName() {
@@ -254,4 +258,8 @@ public class BesuNodeConfiguration {
public boolean isStrictTxReplayProtectionEnabled() {
return strictTxReplayProtectionEnabled;
}
public Map<String, String> getEnvironment() {
return environment;
}
}

View File

@@ -44,7 +44,9 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class BesuNodeConfigurationBuilder {
@@ -85,6 +87,7 @@ public class BesuNodeConfigurationBuilder {
private Optional<KeyPair> keyPair = Optional.empty();
private Optional<PkiKeyStoreConfiguration> pkiKeyStoreConfiguration = Optional.empty();
private Boolean strictTxReplayProtectionEnabled = false;
private Map<String, String> environment = new HashMap<>();
public BesuNodeConfigurationBuilder() {
// Check connections more frequently during acceptance tests to cut down on
@@ -483,6 +486,11 @@ public class BesuNodeConfigurationBuilder {
return this;
}
public BesuNodeConfigurationBuilder environment(final Map<String, String> environment) {
this.environment = environment;
return this;
}
public BesuNodeConfiguration build() {
return new BesuNodeConfiguration(
name,
@@ -515,6 +523,7 @@ public class BesuNodeConfigurationBuilder {
runCommand,
keyPair,
pkiKeyStoreConfiguration,
strictTxReplayProtectionEnabled);
strictTxReplayProtectionEnabled,
environment);
}
}

View File

@@ -88,7 +88,8 @@ public class BesuNodeFactory {
config.getRunCommand(),
config.getKeyPair(),
config.getPkiKeyStoreConfiguration(),
config.isStrictTxReplayProtectionEnabled());
config.isStrictTxReplayProtectionEnabled(),
config.getEnvironment());
}
public BesuNode createMinerNode(

View File

@@ -18,6 +18,7 @@ import org.hyperledger.besu.tests.acceptance.dsl.node.configuration.genesis.Gene
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public interface NodeConfiguration {
@@ -59,4 +60,6 @@ public interface NodeConfiguration {
boolean isRevertReasonEnabled();
List<String> getStaticNodes();
Map<String, String> getEnvironment();
}

View File

@@ -128,7 +128,8 @@ public class PrivacyNode implements AutoCloseable {
List.of(),
Optional.empty(),
Optional.empty(),
besuConfig.isStrictTxReplayProtectionEnabled());
besuConfig.isStrictTxReplayProtectionEnabled(),
besuConfig.getEnvironment());
}
public void testEnclaveConnection(final List<PrivacyNode> otherNodes) {

View File

@@ -55,15 +55,16 @@ dependencies {
testImplementation 'com.github.tomakehurst:wiremock-jre8-standalone'
testImplementation 'commons-io:commons-io'
testImplementation 'io.grpc:grpc-all'
testImplementation 'io.grpc:grpc-core'
testImplementation 'io.grpc:grpc-netty'
testImplementation 'io.grpc:grpc-stub'
testImplementation 'io.jaegertracing:jaeger-client'
testImplementation 'io.jaegertracing:jaeger-proto'
testImplementation 'io.opentelemetry:opentelemetry-extension-trace-propagators'
testImplementation 'io.opentelemetry.instrumentation:opentelemetry-okhttp-3.0'
testImplementation 'io.netty:netty-all'
testImplementation 'io.opentelemetry.proto:opentelemetry-proto'
testImplementation 'io.opentelemetry:opentelemetry-api'
testImplementation 'io.opentelemetry:opentelemetry-exporter-otlp'
testImplementation 'io.opentelemetry.proto:opentelemetry-proto'
testImplementation 'io.opentelemetry:opentelemetry-sdk'
testImplementation 'io.opentelemetry:opentelemetry-sdk-trace'
testImplementation 'io.opentracing.contrib:opentracing-okhttp3'

View File

@@ -26,7 +26,9 @@ import org.hyperledger.besu.tests.acceptance.dsl.node.configuration.BesuNodeConf
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.io.Closer;
import com.google.protobuf.ByteString;
@@ -34,7 +36,11 @@ import io.grpc.Server;
import io.grpc.Status;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.jaegertracing.Configuration;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.extension.trace.propagation.B3Propagator;
import io.opentelemetry.instrumentation.okhttp.v3_0.OkHttpTelemetry;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
@@ -44,8 +50,9 @@ import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.Span;
import io.opentracing.Tracer;
import io.opentracing.contrib.okhttp3.TracingCallFactory;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
@@ -120,6 +127,7 @@ public class OpenTelemetryAcceptanceTest extends AcceptanceTestBase {
@Before
public void setUp() throws Exception {
System.setProperty("root.log.level", "DEBUG");
Server server =
NettyServerBuilder.forPort(4317)
.addService(fakeTracesCollector)
@@ -135,12 +143,22 @@ public class OpenTelemetryAcceptanceTest extends AcceptanceTestBase {
.port(0)
.hostsAllowlist(singletonList("*"))
.build();
Map<String, String> env = new HashMap<>();
env.put("OTEL_METRIC_EXPORT_INTERVAL", "1000");
env.put("OTEL_TRACES_SAMPLER", "always_on");
env.put("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317");
env.put("OTEL_EXPORTER_OTLP_INSECURE", "true");
env.put("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc");
env.put("OTEL_BSP_SCHEDULE_DELAY", "1000");
env.put("OTEL_BSP_EXPORT_TIMEOUT", "3000");
metricsNode =
besu.create(
new BesuNodeConfigurationBuilder()
.name("metrics-node")
.jsonRpcEnabled()
.metricsConfiguration(configuration)
.environment(env)
.build());
cluster.start(metricsNode);
}
@@ -170,11 +188,11 @@ public class OpenTelemetryAcceptanceTest extends AcceptanceTestBase {
net.netVersion().verify(metricsNode);
List<ResourceSpans> spans = fakeTracesCollector.getReceivedSpans();
assertThat(spans.isEmpty()).isFalse();
Span internalSpan = spans.get(0).getInstrumentationLibrarySpans(0).getSpans(0);
Span internalSpan = spans.get(0).getScopeSpans(0).getSpans(0);
assertThat(internalSpan.getKind()).isEqualTo(Span.SpanKind.SPAN_KIND_INTERNAL);
ByteString parent = internalSpan.getParentSpanId();
assertThat(parent.isEmpty()).isFalse();
Span serverSpan = spans.get(0).getInstrumentationLibrarySpans(0).getSpans(1);
Span serverSpan = spans.get(0).getScopeSpans(0).getSpans(1);
assertThat(serverSpan.getKind()).isEqualTo(Span.SpanKind.SPAN_KIND_SERVER);
ByteString rootSpanId = serverSpan.getParentSpanId();
assertThat(rootSpanId.isEmpty()).isTrue();
@@ -184,23 +202,26 @@ public class OpenTelemetryAcceptanceTest extends AcceptanceTestBase {
@Test
public void traceReportingWithTraceId() {
Duration timeout = Duration.ofSeconds(1);
OkHttpClient okClient =
new OkHttpClient.Builder()
.connectTimeout(timeout)
.readTimeout(timeout)
.writeTimeout(timeout)
.build();
WaitUtils.waitFor(
30,
() -> {
// call the json RPC endpoint to generate a trace - with trace metadata of our own
Configuration config =
new Configuration("okhttp")
.withSampler(
Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1));
Tracer tracer = config.getTracer();
Call.Factory client = new TracingCallFactory(okClient, tracer);
OpenTelemetry openTelemetry =
OpenTelemetrySdk.builder()
.setPropagators(
ContextPropagators.create(
TextMapPropagator.composite(B3Propagator.injectingSingleHeader())))
.setTracerProvider(
SdkTracerProvider.builder().setSampler(Sampler.alwaysOn()).build())
.build();
Call.Factory client =
OkHttpTelemetry.builder(openTelemetry)
.build()
.newCallFactory(
new OkHttpClient.Builder()
.connectTimeout(timeout)
.readTimeout(timeout)
.writeTimeout(timeout)
.build());
Request request =
new Request.Builder()
.url("http://localhost:" + metricsNode.getJsonRpcPort().get())
@@ -210,19 +231,22 @@ public class OpenTelemetryAcceptanceTest extends AcceptanceTestBase {
MediaType.get("application/json")))
.build();
Response response = client.newCall(request).execute();
assertThat(response.code()).isEqualTo(200);
response.close();
List<ResourceSpans> spans = new ArrayList<>(fakeTracesCollector.getReceivedSpans());
fakeTracesCollector.getReceivedSpans().clear();
assertThat(spans.isEmpty()).isFalse();
Span internalSpan = spans.get(0).getInstrumentationLibrarySpans(0).getSpans(0);
assertThat(internalSpan.getKind()).isEqualTo(Span.SpanKind.SPAN_KIND_INTERNAL);
ByteString parent = internalSpan.getParentSpanId();
assertThat(parent.isEmpty()).isFalse();
Span serverSpan = spans.get(0).getInstrumentationLibrarySpans(0).getSpans(1);
assertThat(serverSpan.getKind()).isEqualTo(Span.SpanKind.SPAN_KIND_SERVER);
ByteString rootSpanId = serverSpan.getParentSpanId();
assertThat(rootSpanId.isEmpty()).isFalse();
try {
assertThat(response.code()).isEqualTo(200);
List<ResourceSpans> spans = new ArrayList<>(fakeTracesCollector.getReceivedSpans());
assertThat(spans.isEmpty()).isFalse();
Span internalSpan = spans.get(0).getScopeSpans(0).getSpans(0);
assertThat(internalSpan.getKind()).isEqualTo(Span.SpanKind.SPAN_KIND_INTERNAL);
ByteString parent = internalSpan.getParentSpanId();
assertThat(parent.isEmpty()).isFalse();
Span serverSpan = spans.get(0).getScopeSpans(0).getSpans(1);
assertThat(serverSpan.getKind()).isEqualTo(Span.SpanKind.SPAN_KIND_SERVER);
ByteString rootSpanId = serverSpan.getParentSpanId();
assertThat(rootSpanId.isEmpty()).isFalse();
} finally {
response.close();
fakeTracesCollector.getReceivedSpans().clear();
}
});
}
}

View File

@@ -88,6 +88,7 @@ dependencies {
testImplementation 'com.google.auto.service:auto-service'
testImplementation 'com.squareup.okhttp3:okhttp'
testImplementation 'commons-io:commons-io'
testImplementation 'io.opentelemetry:opentelemetry-api'
testImplementation 'junit:junit'
testImplementation 'org.apache.commons:commons-text'
testImplementation 'org.apache.tuweni:tuweni-bytes'

View File

@@ -99,6 +99,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import org.apache.tuweni.bytes.Bytes;
@@ -319,6 +320,8 @@ public abstract class CommandTestAbstract {
@Before
public void setUpStreams() {
// reset the global opentelemetry singleton
GlobalOpenTelemetry.resetForTest();
commandOutput.reset();
commandErrorOutput.reset();
System.setOut(new PrintStream(commandOutput));
@@ -355,6 +358,8 @@ public abstract class CommandTestAbstract {
protected TestBesuCommand parseCommand(final InputStream in, final String... args) {
// turn off ansi usage globally in picocli
System.setProperty("picocli.ansi", "false");
// reset GlobalOpenTelemetry
GlobalOpenTelemetry.resetForTest();
final TestBesuCommand besuCommand =
new TestBesuCommand(

View File

@@ -33,6 +33,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.tls.TlsClientAuthConfiguration;
import org.hyperledger.besu.ethereum.api.tls.TlsConfiguration;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem;
import org.hyperledger.besu.nat.NatMethod;
import org.hyperledger.besu.nat.NatService;
import org.hyperledger.besu.nat.core.domain.NatServiceType;
@@ -56,12 +57,13 @@ import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.TracerProvider;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
@@ -125,6 +127,7 @@ public class JsonRpcHttpService {
private final NatService natService;
private final Path dataDir;
private final LabelledMetric<OperationTimer> requestTimer;
private TracerProvider tracerProvider;
private Tracer tracer;
private final int maxActiveConnections;
private final AtomicInteger activeConnectionsCount = new AtomicInteger();
@@ -201,6 +204,9 @@ public class JsonRpcHttpService {
this.livenessService = livenessService;
this.readinessService = readinessService;
this.maxActiveConnections = config.getMaxActiveConnections();
if (metricsSystem instanceof OpenTelemetrySystem) {
this.tracerProvider = ((OpenTelemetrySystem) metricsSystem).getTracerProvider();
}
}
private void validateConfig(final JsonRpcConfiguration config) {
@@ -215,8 +221,11 @@ public class JsonRpcHttpService {
public CompletableFuture<?> start() {
LOG.info("Starting JSON-RPC service on {}:{}", config.getHost(), config.getPort());
LOG.debug("max number of active connections {}", maxActiveConnections);
this.tracer = GlobalOpenTelemetry.getTracer("org.hyperledger.besu.jsonrpc", "1.0.0");
if (this.tracerProvider != null) {
this.tracer = tracerProvider.get("org.hyperledger.besu.jsonrpc", "1.0.0");
} else {
this.tracer = OpenTelemetry.noop().getTracer("org.hyperledger.besu.jsonrpc", "1.0.0");
}
final CompletableFuture<?> resultFuture = new CompletableFuture<>();
try {

View File

@@ -39,6 +39,7 @@ import org.hyperledger.besu.ethereum.api.tls.TlsClientAuthConfiguration;
import org.hyperledger.besu.ethereum.api.tls.TlsConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem;
import org.hyperledger.besu.nat.NatMethod;
import org.hyperledger.besu.nat.NatService;
import org.hyperledger.besu.nat.core.domain.NatServiceType;
@@ -62,12 +63,13 @@ import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.TracerProvider;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
@@ -135,6 +137,7 @@ public class JsonRpcService {
private final NatService natService;
private final Path dataDir;
private final LabelledMetric<OperationTimer> requestTimer;
private TracerProvider tracerProvider;
private Tracer tracer;
private final int maxActiveConnections;
private final AtomicInteger activeConnectionsCount = new AtomicInteger();
@@ -183,6 +186,9 @@ public class JsonRpcService {
"Time taken to process a JSON-RPC request",
"methodName");
JsonRpcProcessor jsonRpcProcessor = new BaseJsonRpcProcessor();
if (metricsSystem instanceof OpenTelemetrySystem) {
this.tracerProvider = ((OpenTelemetrySystem) metricsSystem).getTracerProvider();
}
this.socketConfiguration =
maybeSockets.isPresent() ? maybeSockets.get() : WebSocketConfiguration.createDefault();
@@ -215,8 +221,11 @@ public class JsonRpcService {
public CompletableFuture<Void> start() {
LOG.info("Starting JSON-RPC service on {}:{}", config.getHost(), config.getPort());
LOG.debug("max number of active connections {}", maxActiveConnections);
this.tracer = GlobalOpenTelemetry.getTracer("org.hyperledger.besu.jsonrpc", "1.0.0");
if (this.tracerProvider != null) {
this.tracer = tracerProvider.get("org.hyperledger.besu.jsonrpc", "1.0.0");
} else {
this.tracer = OpenTelemetry.noop().getTracer("org.hyperledger.besu.jsonrpc", "1.0.0");
}
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
try {
// Create the HTTP server and a router object.

View File

@@ -65,14 +65,12 @@ dependencyManagement {
dependency 'info.picocli:picocli:4.6.3'
dependencySet(group: 'io.grpc', version: '1.47.0') {
entry'grpc-core'
entry'grpc-netty'
entry'grpc-stub'
entry 'grpc-all'
entry 'grpc-core'
entry 'grpc-netty'
entry 'grpc-stub'
}
dependency 'io.jaegertracing:jaeger-client:1.8.0'
dependency 'io.jaegertracing:jaeger-proto:0.7.0'
dependency 'io.kubernetes:client-java:15.0.1'
dependency 'io.netty:netty-all:4.1.78.Final'
@@ -81,14 +79,16 @@ dependencyManagement {
dependency group: 'io.netty', name: 'netty-transport-native-kqueue', version:'4.1.78.Final', classifier: 'osx-x86_64'
dependency 'io.netty:netty-transport-native-unix-common:4.1.78.Final'
dependency 'io.opentelemetry:opentelemetry-api:1.6.0'
dependency 'io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.6.0-alpha'
dependency 'io.opentelemetry:opentelemetry-exporter-otlp:1.6.0'
dependency 'io.opentelemetry:opentelemetry-extension-trace-propagators:1.6.0'
dependency 'io.opentelemetry:opentelemetry-sdk-trace:1.6.0'
dependency 'io.opentelemetry:opentelemetry-sdk:1.6.0'
dependency 'io.opentelemetry:opentelemetry-semconv:1.6.0-alpha'
dependency 'io.opentelemetry.proto:opentelemetry-proto:0.13.0-alpha'
dependency 'io.opentelemetry:opentelemetry-api:1.19.0'
dependency 'io.opentelemetry:opentelemetry-exporter-otlp:1.19.0'
dependency 'io.opentelemetry:opentelemetry-extension-trace-propagators:1.19.0'
dependency 'io.opentelemetry.proto:opentelemetry-proto:0.19.0-alpha'
dependency 'io.opentelemetry:opentelemetry-sdk-metrics:1.19.0'
dependency 'io.opentelemetry:opentelemetry-sdk-trace:1.19.0'
dependency 'io.opentelemetry:opentelemetry-sdk:1.19.0'
dependency 'io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.19.0-alpha'
dependency 'io.opentelemetry:opentelemetry-semconv:1.19.0-alpha'
dependency 'io.opentelemetry.instrumentation:opentelemetry-okhttp-3.0:1.19.1-alpha'
dependency 'io.opentracing.contrib:opentracing-okhttp3:3.0.0'
dependency 'io.opentracing:opentracing-api:0.33.0'

View File

@@ -41,7 +41,6 @@ dependencies {
implementation 'com.google.guava:guava'
implementation 'io.grpc:grpc-netty'
implementation 'io.grpc:grpc-core'
implementation 'io.jaegertracing:jaeger-proto'
implementation 'io.netty:netty-tcnative-boringssl-static'
implementation 'io.netty:netty-transport-native-epoll'
implementation 'io.netty:netty-all'
@@ -49,9 +48,9 @@ dependencies {
implementation 'io.opentelemetry:opentelemetry-sdk'
implementation 'io.opentelemetry:opentelemetry-semconv'
implementation 'io.opentelemetry:opentelemetry-sdk-trace'
implementation 'io.opentelemetry:opentelemetry-sdk-metrics'
implementation 'io.opentelemetry:opentelemetry-exporter-otlp'
implementation 'io.opentelemetry:opentelemetry-exporter-otlp-metrics'
implementation 'io.opentelemetry.proto:opentelemetry-proto'
implementation 'io.opentelemetry:opentelemetry-sdk-extension-autoconfigure'
implementation 'io.prometheus:simpleclient'
implementation 'io.prometheus:simpleclient_common'

View File

@@ -14,8 +14,7 @@
*/
package org.hyperledger.besu.metrics;
import org.hyperledger.besu.metrics.opentelemetry.MetricsOtelGrpcPushService;
import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem;
import org.hyperledger.besu.metrics.opentelemetry.MetricsOtelPushService;
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration;
import org.hyperledger.besu.metrics.prometheus.MetricsHttpService;
import org.hyperledger.besu.metrics.prometheus.MetricsPushGatewayService;
@@ -49,8 +48,7 @@ public interface MetricsService {
}
} else if (configuration.getProtocol() == MetricsProtocol.OPENTELEMETRY) {
if (configuration.isEnabled()) {
return Optional.of(
new MetricsOtelGrpcPushService(configuration, (OpenTelemetrySystem) metricsSystem));
return Optional.of(new MetricsOtelPushService());
} else {
return Optional.empty();
}

View File

@@ -22,6 +22,8 @@ import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem;
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration;
import org.hyperledger.besu.metrics.prometheus.PrometheusMetricsSystem;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +34,10 @@ public class MetricsSystemFactory {
private MetricsSystemFactory() {}
private static void disableGlobalOpenTelemetry() {
GlobalOpenTelemetry.set(OpenTelemetry.noop());
}
/**
* Creates and starts a new metric system to observe the behavior of the client
*
@@ -41,6 +47,7 @@ public class MetricsSystemFactory {
public static ObservableMetricsSystem create(final MetricsConfiguration metricsConfiguration) {
LOG.trace("Creating a metric system with {}", metricsConfiguration.getProtocol());
if (!metricsConfiguration.isEnabled() && !metricsConfiguration.isPushEnabled()) {
disableGlobalOpenTelemetry();
return new NoOpMetricsSystem();
}
if (PROMETHEUS.equals(metricsConfiguration.getProtocol())) {
@@ -48,13 +55,15 @@ public class MetricsSystemFactory {
new PrometheusMetricsSystem(
metricsConfiguration.getMetricCategories(), metricsConfiguration.isTimersEnabled());
metricsSystem.init();
disableGlobalOpenTelemetry();
return metricsSystem;
} else if (OPENTELEMETRY.equals(metricsConfiguration.getProtocol())) {
final OpenTelemetrySystem metricsSystem =
new OpenTelemetrySystem(
metricsConfiguration.getMetricCategories(),
metricsConfiguration.isTimersEnabled(),
metricsConfiguration.getPrometheusJob());
metricsConfiguration.getPrometheusJob(),
true);
metricsSystem.initDefaults();
return metricsSystem;
} else {

View File

@@ -0,0 +1,57 @@
/*
* Copyright Besu Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.metrics.opentelemetry;
import java.util.Collection;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.CollectionRegistration;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
import org.jetbrains.annotations.NotNull;
class DebugMetricReader implements MetricReader {
private CollectionRegistration registration;
public DebugMetricReader() {}
public Collection<MetricData> getAllMetrics() {
return MetricProducer.asMetricProducer(this.registration).collectAllMetrics();
}
@Override
public void register(final @NotNull CollectionRegistration registration) {
this.registration = registration;
}
@Override
public CompletableResultCode forceFlush() {
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
@Override
public AggregationTemporality getAggregationTemporality(
final @NotNull InstrumentType instrumentType) {
return AggregationTemporality.CUMULATIVE;
}
}

View File

@@ -1,88 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.metrics.opentelemetry;
import org.hyperledger.besu.metrics.MetricsService;
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.export.IntervalMetricReader;
import io.opentelemetry.sdk.metrics.export.IntervalMetricReaderBuilder;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MetricsOtelGrpcPushService implements MetricsService {
private static final Logger LOG = LoggerFactory.getLogger(MetricsOtelGrpcPushService.class);
private final MetricsConfiguration configuration;
private final OpenTelemetrySystem metricsSystem;
private IntervalMetricReader periodicReader;
private SpanProcessor spanProcessor;
public MetricsOtelGrpcPushService(
final MetricsConfiguration configuration, final OpenTelemetrySystem metricsSystem) {
this.configuration = configuration;
this.metricsSystem = metricsSystem;
}
@Override
public CompletableFuture<?> start() {
LOG.info("Starting OpenTelemetry push service");
OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.getDefault();
IntervalMetricReaderBuilder builder =
IntervalMetricReader.builder()
.setExportIntervalMillis(configuration.getPushInterval() * 1000L)
.setMetricProducers(Collections.singleton(metricsSystem.getMeterSdkProvider()))
.setMetricExporter(exporter);
this.periodicReader = builder.buildAndStart();
this.spanProcessor = BatchSpanProcessor.builder(OtlpGrpcSpanExporter.builder().build()).build();
OpenTelemetrySdk.builder()
.setTracerProvider(SdkTracerProvider.builder().addSpanProcessor(spanProcessor).build())
.buildAndRegisterGlobal();
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> stop() {
if (periodicReader != null) {
periodicReader.shutdown();
}
if (spanProcessor != null) {
CompletableResultCode result = spanProcessor.shutdown();
CompletableFuture<?> future = new CompletableFuture<>();
result.whenComplete(() -> future.complete(null));
return future;
}
return CompletableFuture.completedFuture(null);
}
@Override
public Optional<Integer> getPort() {
return Optional.empty();
}
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright Hyperledger Besu Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.metrics.opentelemetry;
import org.hyperledger.besu.metrics.MetricsService;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MetricsOtelPushService implements MetricsService {
private static final Logger LOG = LoggerFactory.getLogger(MetricsOtelPushService.class);
public MetricsOtelPushService() {}
@Override
public CompletableFuture<?> start() {
LOG.info("Starting OpenTelemetry push service");
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> stop() {
return CompletableFuture.completedFuture(null);
}
@Override
public Optional<Integer> getPort() {
return Optional.empty();
}
}

View File

@@ -19,7 +19,6 @@ import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.BoundLongCounter;
import io.opentelemetry.api.metrics.LongCounter;
public class OpenTelemetryCounter implements LabelledMetric<Counter> {
@@ -39,25 +38,26 @@ public class OpenTelemetryCounter implements LabelledMetric<Counter> {
builder.put(labelNames[i], labelValues[i]);
}
final Attributes labels = builder.build();
BoundLongCounter boundLongCounter = counter.bind(labels);
return new OpenTelemetryCounter.UnlabelledCounter(boundLongCounter);
return new BoundLongCounter(counter, labels);
}
private static class UnlabelledCounter implements Counter {
private final BoundLongCounter counter;
private static class BoundLongCounter implements Counter {
private final LongCounter counter;
private final Attributes labels;
private UnlabelledCounter(final BoundLongCounter counter) {
private BoundLongCounter(final LongCounter counter, final Attributes labels) {
this.counter = counter;
this.labels = labels;
}
@Override
public void inc() {
counter.add(1);
counter.add(1, labels);
}
@Override
public void inc(final long amount) {
counter.add(amount);
counter.add(amount, labels);
}
}
}

View File

@@ -64,6 +64,6 @@ public class OpenTelemetryGauge implements LabelledGauge {
private void updater(final ObservableDoubleMeasurement measurement) {
observationsMap.forEach(
(labels, valueSupplier) -> measurement.observe(valueSupplier.getAsDouble(), labels));
(labels, valueSupplier) -> measurement.record(valueSupplier.getAsDouble(), labels));
}
}

View File

@@ -31,12 +31,14 @@ import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleSupplier;
import java.util.stream.Stream;
@@ -45,15 +47,20 @@ import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.TracerProvider;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.DoubleSummaryPointData;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.data.SummaryPointData;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,25 +84,35 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
private final Map<String, LabelledMetric<Counter>> cachedCounters = new ConcurrentHashMap<>();
private final Map<String, LabelledMetric<OperationTimer>> cachedTimers =
new ConcurrentHashMap<>();
private final SdkMeterProvider meterSdkProvider;
private final SdkMeterProvider sdkMeterProvider;
private final DebugMetricReader debugMetricReader;
private final SdkTracerProvider sdkTracerProvider;
public OpenTelemetrySystem(
final Set<MetricCategory> enabledCategories,
final boolean timersEnabled,
final String jobName) {
final String jobName,
final boolean setAsGlobal) {
LOG.info("Starting OpenTelemetry metrics system");
this.enabledCategories = ImmutableSet.copyOf(enabledCategories);
this.timersEnabled = timersEnabled;
this.debugMetricReader = new DebugMetricReader();
Resource resource =
Resource.getDefault()
.merge(
Resource.create(
Attributes.builder().put(ResourceAttributes.SERVICE_NAME, jobName).build()));
this.meterSdkProvider = SdkMeterProvider.builder().setResource(resource).build();
}
SdkMeterProvider getMeterSdkProvider() {
return meterSdkProvider;
AutoConfiguredOpenTelemetrySdk autoSdk =
AutoConfiguredOpenTelemetrySdk.builder()
.addMeterProviderCustomizer(
(provider, config) ->
provider.setResource(resource).registerMetricReader(debugMetricReader))
.addTracerProviderCustomizer((provider, config) -> provider.setResource(resource))
.setResultAsGlobal(setAsGlobal)
.build();
OpenTelemetrySdk sdk = autoSdk.getOpenTelemetrySdk();
this.sdkMeterProvider = sdk.getSdkMeterProvider();
this.sdkTracerProvider = sdk.getSdkTracerProvider();
}
@Override
@@ -105,14 +122,17 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
@Override
public Stream<Observation> streamObservations() {
Collection<MetricData> metricsList = meterSdkProvider.collectAllMetrics();
return metricsList.stream().map(this::convertToObservations).flatMap(stream -> stream);
Collection<MetricData> metricsList = this.debugMetricReader.getAllMetrics();
return metricsList.stream().flatMap(this::convertToObservations);
}
private Stream<Observation> convertToObservations(final MetricData metricData) {
List<Observation> observations = new ArrayList<>();
MetricCategory category =
categoryNameToMetricCategory(metricData.getInstrumentationLibraryInfo().getName());
categoryNameToMetricCategory(metricData.getInstrumentationScopeInfo().getName());
if (category == null) {
return Stream.empty();
}
Collection<?> points;
switch (metricData.getType()) {
case DOUBLE_GAUGE:
@@ -122,13 +142,13 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
points = metricData.getDoubleSumData().getPoints();
break;
case SUMMARY:
points = metricData.getDoubleSummaryData().getPoints();
points = metricData.getData().getPoints();
break;
case LONG_SUM:
points = metricData.getLongSumData().getPoints();
break;
case HISTOGRAM:
points = metricData.getDoubleHistogramData().getPoints();
points = metricData.getData().getPoints();
break;
case LONG_GAUGE:
points = metricData.getLongGaugeData().getPoints();
@@ -159,7 +179,7 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
return category;
}
}
throw new IllegalArgumentException("Invalid metric category: " + name);
return null;
}
private Object extractValue(final MetricDataType type, final PointData point) {
@@ -170,9 +190,9 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
case DOUBLE_GAUGE:
return ((DoublePointData) point).getValue();
case SUMMARY:
return ((DoubleSummaryPointData) point).getPercentileValues();
return ((SummaryPointData) point).getValues();
case HISTOGRAM:
return ((DoubleHistogramPointData) point).getCounts();
return ((HistogramPointData) point).getCounts();
default:
throw new UnsupportedOperationException("Unsupported type " + type);
}
@@ -189,7 +209,7 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
name,
(k) -> {
if (isCategoryEnabled(category)) {
final Meter meter = meterSdkProvider.get(category.getName());
final Meter meter = sdkMeterProvider.get(category.getName());
final LongCounter counter = meter.counterBuilder(name).setDescription(help).build();
return new OpenTelemetryCounter(counter, labelNames);
@@ -210,7 +230,7 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
name,
(k) -> {
if (timersEnabled && isCategoryEnabled(category)) {
final Meter meter = meterSdkProvider.get(category.getName());
final Meter meter = sdkMeterProvider.get(category.getName());
return new OpenTelemetryTimer(name, help, meter, labelNames);
} else {
return NoOpMetricsSystem.getOperationTimerLabelledMetric(labelNames.length);
@@ -226,11 +246,11 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
final DoubleSupplier valueSupplier) {
LOG.trace("Creating a gauge {}", name);
if (isCategoryEnabled(category)) {
final Meter meter = meterSdkProvider.get(category.getName());
final Meter meter = sdkMeterProvider.get(category.getName());
meter
.gaugeBuilder(name)
.setDescription(help)
.buildWithCallback(res -> res.observe(valueSupplier.getAsDouble(), Attributes.empty()));
.buildWithCallback(res -> res.record(valueSupplier.getAsDouble(), Attributes.empty()));
}
}
@@ -243,7 +263,7 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
LOG.trace("Creating a labelled gauge {}", name);
if (isCategoryEnabled(category)) {
return new OpenTelemetryGauge(
name, help, meterSdkProvider.get(category.getName()), List.of(labelNames));
name, help, sdkMeterProvider.get(category.getName()), List.of(labelNames));
}
return NoOpMetricsSystem.getLabelledGauge(labelNames.length);
}
@@ -264,7 +284,7 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
ManagementFactory.getGarbageCollectorMXBeans();
final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
final List<MemoryPoolMXBean> poolBeans = ManagementFactory.getMemoryPoolMXBeans();
final Meter meter = meterSdkProvider.get(StandardMetricCategory.JVM.getName());
final Meter meter = sdkMeterProvider.get(StandardMetricCategory.JVM.getName());
final List<Attributes> labelSets = new ArrayList<>(garbageCollectors.size());
for (final GarbageCollectorMXBean gc : garbageCollectors) {
labelSets.add(Attributes.of(AttributeKey.stringKey("gc"), gc.getName()));
@@ -276,7 +296,7 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
.buildWithCallback(
resultLongObserver -> {
for (int i = 0; i < garbageCollectors.size(); i++) {
resultLongObserver.observe(
resultLongObserver.record(
(double) garbageCollectors.get(i).getCollectionTime(), labelSets.get(i));
}
});
@@ -297,12 +317,12 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
resultLongObserver -> {
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
MemoryUsage nonHeapUsage = memoryBean.getNonHeapMemoryUsage();
resultLongObserver.observe(heapUsage.getUsed(), usedHeap);
resultLongObserver.observe(nonHeapUsage.getUsed(), usedNonHeap);
resultLongObserver.observe(heapUsage.getUsed(), committedHeap);
resultLongObserver.observe(nonHeapUsage.getUsed(), committedNonHeap);
resultLongObserver.observe(heapUsage.getUsed(), maxHeap);
resultLongObserver.observe(nonHeapUsage.getUsed(), maxNonHeap);
resultLongObserver.record(heapUsage.getUsed(), usedHeap);
resultLongObserver.record(nonHeapUsage.getUsed(), usedNonHeap);
resultLongObserver.record(heapUsage.getUsed(), committedHeap);
resultLongObserver.record(nonHeapUsage.getUsed(), committedNonHeap);
resultLongObserver.record(heapUsage.getUsed(), maxHeap);
resultLongObserver.record(nonHeapUsage.getUsed(), maxNonHeap);
});
final List<Attributes> usedLabelSets = new ArrayList<>(poolBeans.size());
final List<Attributes> committedLabelSets = new ArrayList<>(poolBeans.size());
@@ -322,12 +342,24 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
resultLongObserver -> {
for (int i = 0; i < poolBeans.size(); i++) {
MemoryUsage poolUsage = poolBeans.get(i).getUsage();
resultLongObserver.observe(poolUsage.getUsed(), usedLabelSets.get(i));
resultLongObserver.observe(poolUsage.getCommitted(), committedLabelSets.get(i));
resultLongObserver.record(poolUsage.getUsed(), usedLabelSets.get(i));
resultLongObserver.record(poolUsage.getCommitted(), committedLabelSets.get(i));
// TODO: Decide if max is needed or not. May be derived with some approximation from
// max(used).
resultLongObserver.observe(poolUsage.getMax(), maxLabelSets.get(i));
resultLongObserver.record(poolUsage.getMax(), maxLabelSets.get(i));
}
});
}
/** Shuts down the OpenTelemetry exporters, blocking until they have completed orderly. */
public void shutdown() {
final CompletableResultCode result =
CompletableResultCode.ofAll(
Arrays.asList(this.sdkMeterProvider.shutdown(), this.sdkTracerProvider.shutdown()));
result.join(5000, TimeUnit.SECONDS);
}
public TracerProvider getTracerProvider() {
return sdkTracerProvider;
}
}

View File

@@ -51,7 +51,7 @@ public class OpenTelemetryTimer implements LabelledMetric<OperationTimer> {
meter
.gaugeBuilder(metricName)
.setDescription(help)
.buildWithCallback((measurement) -> measurement.observe((double) elapsed, labels));
.buildWithCallback((measurement) -> measurement.record((double) elapsed, labels));
return elapsed / 1e9;
};
};

View File

@@ -38,8 +38,13 @@ import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableSet;
import io.opentelemetry.api.GlobalOpenTelemetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class OpenTelemetryMetricsSystemTest {
@@ -49,11 +54,38 @@ public class OpenTelemetryMetricsSystemTest {
.thenComparing(Observation::getMetricName)
.thenComparing((o1, o2) -> o1.getLabels().equals(o2.getLabels()) ? 0 : 1);
private final ObservableMetricsSystem metricsSystem =
new OpenTelemetrySystem(DEFAULT_METRIC_CATEGORIES, true, "job");
@Before
public void resetGlobalOpenTelemetry() {
GlobalOpenTelemetry.resetForTest();
}
private OpenTelemetrySystem metricsSystem = null;
private List<Observation> getObservation(final ObservableMetricsSystem metricsSystem)
throws InterruptedException {
for (int i = 0; i < 20; i++) {
Stream<Observation> observations = metricsSystem.streamObservations();
List<Observation> result = observations.collect(Collectors.toList());
if (!result.isEmpty()) {
return result;
}
Thread.sleep(100);
}
return null;
}
@Before
public void setUp() {
metricsSystem = new OpenTelemetrySystem(DEFAULT_METRIC_CATEGORIES, true, "job", false);
}
@After
public void tearDown() {
metricsSystem.shutdown();
}
@Test
public void shouldCreateObservationFromCounter() {
public void shouldCreateObservationFromCounter() throws InterruptedException {
final Counter counter = metricsSystem.createCounter(PEERS, "connected", "Some help string");
counter.inc();
@@ -61,12 +93,12 @@ public class OpenTelemetryMetricsSystemTest {
.containsExactly(new Observation(PEERS, "connected", 1L, emptyList()));
counter.inc();
assertThat(metricsSystem.streamObservations())
assertThat(getObservation(metricsSystem))
.containsExactly(new Observation(PEERS, "connected", 2L, emptyList()));
}
@Test
public void shouldHandleDuplicateCounterCreation() {
public void shouldHandleDuplicateCounterCreation() throws InterruptedException {
final LabelledMetric<Counter> counter1 =
metricsSystem.createLabelledCounter(PEERS, "connected", "Some help string");
final LabelledMetric<Counter> counter2 =
@@ -78,7 +110,7 @@ public class OpenTelemetryMetricsSystemTest {
.containsExactly(new Observation(PEERS, "connected", 1L, emptyList()));
counter2.labels().inc();
assertThat(metricsSystem.streamObservations())
assertThat(getObservation(metricsSystem))
.containsExactly(new Observation(PEERS, "connected", 2L, emptyList()));
}
@@ -98,7 +130,7 @@ public class OpenTelemetryMetricsSystemTest {
}
@Test
public void shouldIncrementCounterBySpecifiedAmount() {
public void shouldIncrementCounterBySpecifiedAmount() throws InterruptedException {
final Counter counter = metricsSystem.createCounter(PEERS, "connected", "Some help string");
counter.inc(5);
@@ -106,7 +138,7 @@ public class OpenTelemetryMetricsSystemTest {
.containsExactly(new Observation(PEERS, "connected", 5L, emptyList()));
counter.inc(6);
assertThat(metricsSystem.streamObservations())
assertThat(getObservation(metricsSystem))
.containsExactly(new Observation(PEERS, "connected", 11L, emptyList()));
}
@@ -152,15 +184,21 @@ public class OpenTelemetryMetricsSystemTest {
@Test
public void shouldNotCreateObservationsFromTimerWhenTimersDisabled() {
final ObservableMetricsSystem metricsSystem =
new OpenTelemetrySystem(DEFAULT_METRIC_CATEGORIES, false, "job");
final LabelledMetric<OperationTimer> timer =
metricsSystem.createLabelledTimer(RPC, "request", "Some help", "methodName");
OpenTelemetrySystem metricsSystem = null;
try {
metricsSystem = new OpenTelemetrySystem(DEFAULT_METRIC_CATEGORIES, false, "job", false);
final LabelledMetric<OperationTimer> timer =
metricsSystem.createLabelledTimer(RPC, "request", "Some help", "methodName");
//noinspection EmptyTryBlock
try (final OperationTimer.TimingContext ignored = timer.labels("method").startTimer()) {}
//noinspection EmptyTryBlock
try (final OperationTimer.TimingContext ignored = timer.labels("method").startTimer()) {}
assertThat(metricsSystem.streamObservations()).isEmpty();
assertThat(metricsSystem.streamObservations()).isEmpty();
} finally {
if (metricsSystem != null) {
metricsSystem.shutdown();
}
}
}
@Test
@@ -171,12 +209,24 @@ public class OpenTelemetryMetricsSystemTest {
.enabled(true)
.protocol(OPENTELEMETRY)
.build();
final ObservableMetricsSystem localMetricSystem =
MetricsSystemFactory.create(metricsConfiguration);
localMetricSystem.createGauge(RPC, "myValue", "Help", () -> 7.0);
OpenTelemetrySystem localMetricSystem = null;
try {
localMetricSystem =
new OpenTelemetrySystem(
metricsConfiguration.getMetricCategories(),
metricsConfiguration.isTimersEnabled(),
metricsConfiguration.getPrometheusJob(),
false);
localMetricSystem.initDefaults();
localMetricSystem.createGauge(RPC, "myValue", "Help", () -> 7.0);
assertThat(localMetricSystem.streamObservations())
.containsExactlyInAnyOrder(new Observation(RPC, "myValue", 7.0, emptyList()));
assertThat(localMetricSystem.streamObservations())
.containsExactlyInAnyOrder(new Observation(RPC, "myValue", 7.0, emptyList()));
} finally {
if (localMetricSystem != null) {
localMetricSystem.shutdown();
}
}
}
@Test
@@ -195,32 +245,45 @@ public class OpenTelemetryMetricsSystemTest {
}
@Test
public void shouldOnlyObserveEnabledMetrics() {
public void shouldOnlyObserveEnabledMetrics() throws InterruptedException {
final MetricsConfiguration metricsConfiguration =
MetricsConfiguration.builder()
.metricCategories(ImmutableSet.of(BesuMetricCategory.RPC))
.enabled(true)
.protocol(OPENTELEMETRY)
.build();
final ObservableMetricsSystem localMetricSystem =
MetricsSystemFactory.create(metricsConfiguration);
OpenTelemetrySystem localMetricSystem = null;
try {
localMetricSystem =
new OpenTelemetrySystem(
metricsConfiguration.getMetricCategories(),
metricsConfiguration.isTimersEnabled(),
metricsConfiguration.getPrometheusJob(),
false);
localMetricSystem.initDefaults();
// do a category we are not watching
final LabelledMetric<Counter> counterN =
localMetricSystem.createLabelledCounter(NETWORK, "ABC", "Not that kind of network", "show");
assertThat(counterN).isSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER);
// do a category we are not watching
final LabelledMetric<Counter> counterN =
localMetricSystem.createLabelledCounter(
NETWORK, "ABC", "Not that kind of network", "show");
assertThat(counterN).isSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER);
counterN.labels("show").inc();
assertThat(localMetricSystem.streamObservations()).isEmpty();
counterN.labels("show").inc();
assertThat(localMetricSystem.streamObservations()).isEmpty();
// do a category we are watching
final LabelledMetric<Counter> counterR =
localMetricSystem.createLabelledCounter(RPC, "name", "Not useful", "method");
assertThat(counterR).isNotSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER);
// do a category we are watching
final LabelledMetric<Counter> counterR =
localMetricSystem.createLabelledCounter(RPC, "name", "Not useful", "method");
assertThat(counterR).isNotSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER);
counterR.labels("op").inc();
assertThat(localMetricSystem.streamObservations())
.containsExactly(new Observation(RPC, "name", (long) 1, singletonList("op")));
counterR.labels("op").inc();
assertThat(getObservation(localMetricSystem))
.containsExactly(new Observation(RPC, "name", (long) 1, singletonList("op")));
} finally {
if (localMetricSystem != null) {
localMetricSystem.shutdown();
}
}
}
@Test
@@ -244,21 +307,15 @@ public class OpenTelemetryMetricsSystemTest {
.pushEnabled(false)
.protocol(OPENTELEMETRY)
.build();
final MetricsSystem localMetricSystem = MetricsSystemFactory.create(metricsConfiguration);
OpenTelemetrySystem localMetricSystem = null;
try {
localMetricSystem = (OpenTelemetrySystem) MetricsSystemFactory.create(metricsConfiguration);
assertThat(localMetricSystem).isInstanceOf(OpenTelemetrySystem.class);
}
@Test
public void returnsNoOpMetricsWhenPushEnabled() {
final MetricsConfiguration metricsConfiguration =
MetricsConfiguration.builder()
.enabled(false)
.pushEnabled(true)
.protocol(OPENTELEMETRY)
.build();
final MetricsSystem localMetricSystem = MetricsSystemFactory.create(metricsConfiguration);
assertThat(localMetricSystem).isInstanceOf(OpenTelemetrySystem.class);
assertThat(localMetricSystem).isInstanceOf(OpenTelemetrySystem.class);
} finally {
if (localMetricSystem != null) {
localMetricSystem.shutdown();
}
}
}
}

View File

@@ -23,6 +23,7 @@ import org.hyperledger.besu.metrics.MetricsSystemFactory;
import java.net.InetSocketAddress;
import java.util.Properties;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.prometheus.client.exporter.common.TextFormat;
import io.vertx.core.Vertx;
import okhttp3.OkHttpClient;
@@ -55,10 +56,12 @@ public class MetricsHttpServiceTest {
}
private static MetricsHttpService createMetricsHttpService(final MetricsConfiguration config) {
GlobalOpenTelemetry.resetForTest();
return new MetricsHttpService(vertx, config, MetricsSystemFactory.create(config));
}
private static MetricsHttpService createMetricsHttpService() {
GlobalOpenTelemetry.resetForTest();
final MetricsConfiguration metricsConfiguration = createMetricsConfig();
return new MetricsHttpService(
vertx, metricsConfiguration, MetricsSystemFactory.create(metricsConfiguration));

View File

@@ -41,6 +41,8 @@ import java.util.Comparator;
import java.util.List;
import com.google.common.collect.ImmutableSet;
import io.opentelemetry.api.GlobalOpenTelemetry;
import org.junit.Before;
import org.junit.Test;
public class PrometheusMetricsSystemTest {
@@ -50,6 +52,11 @@ public class PrometheusMetricsSystemTest {
.thenComparing(Observation::getMetricName)
.thenComparing((o1, o2) -> o1.getLabels().equals(o2.getLabels()) ? 0 : 1);
@Before
public void resetGlobalOpenTelemetry() {
GlobalOpenTelemetry.resetForTest();
}
private final ObservableMetricsSystem metricsSystem =
new PrometheusMetricsSystem(DEFAULT_METRIC_CATEGORIES, true);