Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: demonstrate an example of metric ingestion #249

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
19 changes: 14 additions & 5 deletions hypertrace-ingester/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ dependencies {
implementation(project(":raw-spans-grouper:raw-spans-grouper"))
implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher"))
implementation(project(":hypertrace-view-generator:hypertrace-view-generator"))
implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator"))
implementation(project(":hypertrace-metrics-processor:hypertrace-metrics-processor"))
implementation(project(":hypertrace-metrics-exporter:hypertrace-metrics-exporter"))

testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
testImplementation("org.mockito:mockito-core:3.8.0")
Expand All @@ -64,7 +67,10 @@ tasks.register<Copy>("copyServiceConfigs") {
createCopySpec("span-normalizer", "span-normalizer", "main", "common"),
createCopySpec("raw-spans-grouper", "raw-spans-grouper", "main", "common"),
createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "main", "common"),
createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "main", "common")
createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "main", "common"),
createCopySpec("hypertrace-metrics-generator", "hypertrace-metrics-generator", "main", "common"),
createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "main", "common"),
createCopySpec("hypertrace-metrics-exporter", "hypertrace-metrics-exporter", "main", "common")
).into("./build/resources/main/configs/")
}

Expand Down Expand Up @@ -101,10 +107,13 @@ tasks.test {

tasks.register<Copy>("copyServiceConfigsTest") {
with(
createCopySpec("span-normalizer", "span-normalizer", "test", "span-normalizer"),
createCopySpec("raw-spans-grouper", "raw-spans-grouper", "test", "raw-spans-grouper"),
createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "test", "hypertrace-trace-enricher"),
createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "test", "hypertrace-view-generator")
createCopySpec("span-normalizer", "span-normalizer", "test", "span-normalizer"),
createCopySpec("raw-spans-grouper", "raw-spans-grouper", "test", "raw-spans-grouper"),
createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "test", "hypertrace-trace-enricher"),
createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "test", "hypertrace-view-generator"),
createCopySpec("hypertrace-metrics-generator", "hypertrace-metrics-generator", "test", "hypertrace-metrics-generator"),
createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "test", "hypertrace-metrics-processor"),
createCopySpec("hypertrace-metrics-exporter", "hypertrace-metrics-exporter", "test", "hypertrace-metrics-exporter")
).into("./build/resources/test/configs/")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import org.hypertrace.core.serviceframework.config.ConfigUtils;
import org.hypertrace.core.spannormalizer.SpanNormalizer;
import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher;
import org.hypertrace.metrics.exporter.MetricsExporter;
import org.hypertrace.metrics.generator.MetricsGenerator;
import org.hypertrace.metrics.processor.MetricsProcessor;
import org.hypertrace.traceenricher.trace.enricher.TraceEnricher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,9 +32,33 @@ public class HypertraceIngester extends KafkaStreamsApp {
private static final String HYPERTRACE_INGESTER_JOB_CONFIG = "hypertrace-ingester-job-config";

private Map<String, Pair<String, KafkaStreamsApp>> jobNameToSubTopology = new HashMap<>();
private MetricsExporter metricsExporter;
private Thread metricsExporterThread;

public HypertraceIngester(ConfigClient configClient) {
super(configClient);
metricsExporter =
new MetricsExporter(configClient, getSubJobConfig("hypertrace-metrics-exporter"));
}

@Override
protected void doInit() {
super.doInit();
metricsExporter.doInit();
}

@Override
protected void doStart() {
super.doStart();
metricsExporterThread = new Thread(() -> metricsExporter.doStart());
metricsExporterThread.start();
}

@Override
protected void doStop() {
super.doStop();
metricsExporter.doStop();
metricsExporterThread.stop();
}

private KafkaStreamsApp getSubTopologyInstance(String name) {
Expand All @@ -49,6 +76,12 @@ private KafkaStreamsApp getSubTopologyInstance(String name) {
case "all-views":
kafkaStreamsApp = new MultiViewGeneratorLauncher(ConfigClientFactory.getClient());
break;
case "hypertrace-metrics-generator":
kafkaStreamsApp = new MetricsGenerator(ConfigClientFactory.getClient());
break;
case "hypertrace-metrics-processor":
kafkaStreamsApp = new MetricsProcessor(ConfigClientFactory.getClient());
break;
default:
throw new RuntimeException(String.format("Invalid configured sub-topology : [%s]", name));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ main.class = org.hypertrace.ingester.HypertraceIngester
service.name = hypertrace-ingester
service.admin.port = 8099

sub.topology.names = ["span-normalizer", "raw-spans-grouper", "hypertrace-trace-enricher", "all-views"]
sub.topology.names = [
"span-normalizer",
"raw-spans-grouper",
"hypertrace-trace-enricher",
"all-views",
"hypertrace-metrics-generator",
"hypertrace-metrics-processor"
]

precreate.topics = false
precreate.topics = ${?PRE_CREATE_TOPICS}
Expand Down
29 changes: 29 additions & 0 deletions hypertrace-ingester/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
status = error
name = PropertiesConfig

appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n

appender.rolling.type = RollingFile
appender.rolling.name = ROLLING_FILE
appender.rolling.fileName = ${sys:service.name:-hypertrace-ingester}.log
appender.rolling.filePattern = ${sys:service.name:-hypertrace-ingester}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n
appender.rolling.policies.type = Policies
appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 3600
appender.rolling.policies.time.modulate = true
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 20MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 5

rootLogger.level = INFO
rootLogger.appenderRef.stdout.ref = STDOUT
rootLogger.appenderRef.rolling.ref = ROLLING_FILE



3 changes: 3 additions & 0 deletions hypertrace-metrics-exporter/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
subprojects {
group = "org.hypertrace.metrics.exporter"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
plugins {
java
application
jacoco
id("org.hypertrace.docker-java-application-plugin")
id("org.hypertrace.docker-publish-plugin")
id("org.hypertrace.jacoco-report-plugin")
}

application {
mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher")
}

hypertraceDocker {
defaultImage {
javaApplication {
serviceName.set("${project.name}")
adminPort.set(8099)
}
}
}

tasks.test {
useJUnitPlatform()
}

dependencies {
// common and framework
implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api"))
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21")

// open telemetry
implementation("io.opentelemetry:opentelemetry-api:1.4.1")
implementation("io.opentelemetry:opentelemetry-api-metrics:1.4.1-alpha")
implementation("io.opentelemetry:opentelemetry-sdk:1.4.1")
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common:1.4.1")
implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.4.1-alpah")
implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.4.1-alpha")

// kafka
implementation("org.apache.kafka:kafka-clients:2.6.0")

// test
testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
testImplementation("org.mockito:mockito-core:3.8.0")
testImplementation("com.google.code.gson:gson:2.8.7")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.hypertrace.metrics.exporter;

import com.google.protobuf.InvalidProtocolBufferException;
import com.typesafe.config.Config;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricsConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricsConsumer.class);
private static final int CONSUMER_POLL_TIMEOUT_MS = 100;

private static final String KAFKA_CONFIG_KEY = "kafka.config";
private static final String INPUT_TOPIC_KEY = "input.topic";

private final KafkaConsumer<byte[], byte[]> consumer;

public MetricsConsumer(Config config) {
Properties props = new Properties();
props.putAll(
mergeProperties(getBaseProperties(), getFlatMapConfig(config.getConfig(KAFKA_CONFIG_KEY))));
consumer = new KafkaConsumer<byte[], byte[]>(props);
consumer.subscribe(Collections.singletonList(config.getString(INPUT_TOPIC_KEY)));
}

public List<ResourceMetrics> consume() {
List<ResourceMetrics> resourceMetrics = new ArrayList<>();

ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS));
records.forEach(
record -> {
try {
resourceMetrics.add(ResourceMetrics.parseFrom(record.value()));
} catch (InvalidProtocolBufferException e) {
LOGGER.error("Invalid record with exception", e);
}
});

return resourceMetrics;
}

public void close() {
consumer.close();
}

private Map<String, Object> getBaseProperties() {
Map<String, Object> baseProperties = new HashMap<>();
baseProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "hypertrace-metrics-exporter");
baseProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
baseProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
baseProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
baseProperties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
baseProperties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
return baseProperties;
}

private Map<String, Object> getFlatMapConfig(Config config) {
Map<String, Object> propertiesMap = new HashMap();
config.entrySet().stream()
.forEach(
(entry) -> {
propertiesMap.put((String) entry.getKey(), config.getString((String) entry.getKey()));
});
return propertiesMap;
}

private Map<String, Object> mergeProperties(
Map<String, Object> baseProps, Map<String, Object> props) {
Objects.requireNonNull(baseProps);
props.forEach(baseProps::put);
return baseProps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.hypertrace.metrics.exporter;

import com.typesafe.config.Config;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.hypertrace.core.serviceframework.PlatformService;
import org.hypertrace.core.serviceframework.config.ConfigClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricsExporter extends PlatformService {

private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class);
private static final String OTLP_CONFIG_KEY = "otlp.collector.config";

private MetricsConsumer metricsConsumer;
private OtlpGrpcExporter otlpGrpcExporter;
private Config config;

public MetricsExporter(ConfigClient configClient, Config config) {
super(configClient);
this.config = config;
}

@Override
public void doInit() {
config = (config != null) ? config : getAppConfig();
metricsConsumer = new MetricsConsumer(config);
otlpGrpcExporter = new OtlpGrpcExporter(config.getConfig(OTLP_CONFIG_KEY));
}

@Override
public void doStart() {
while (true) {
List<ResourceMetrics> resourceMetrics = metricsConsumer.consume();
CompletableResultCode result;
if (!resourceMetrics.isEmpty()) {
result = otlpGrpcExporter.export(resourceMetrics);
result.join(1, TimeUnit.MINUTES);
}
waitForSec(1);
}
}

@Override
public void doStop() {
metricsConsumer.close();
otlpGrpcExporter.close();
}

@Override
public boolean healthCheck() {
return true;
}

private void waitForSec(int secs) {
try {
Thread.sleep(1000L * secs);
} catch (InterruptedException e) {
LOGGER.debug("waiting for pushing next records were intruppted");
}
}
}
Loading