Skip to content

Commit

Permalink
Integrate source performance harness with datadog (airbytehq#31410)
Browse files Browse the repository at this point in the history
Co-authored-by: xiaohansong <[email protected]>
  • Loading branch information
2 people authored and ariesgun committed Oct 23, 2023
1 parent 2183055 commit 62bb25b
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 6 deletions.
5 changes: 1 addition & 4 deletions .github/workflows/connector-performance-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,10 @@ jobs:
connector_name=$(echo $CONN | cut -d / -f 2)
kind load docker-image airbyte/$connector_name:dev --name chart-testing
kind load docker-image airbyte/$HARNESS_TYPE:dev --name chart-testing
# envsubst requires variables to be exported
# envsubst requires variables to be exported or setup in the env field in this step.
export CONNECTOR_IMAGE_NAME=${CONN/connectors/airbyte}:dev
export DATASET=$DS
export STREAM_NUMBER=$STREAM_NUMBER
export SYNC_MODE=$SYNC_MODE
export HARNESS=$HARNESS_TYPE
export DD_API_KEY=$DD_API_KEY
envsubst < ./tools/bin/run-harness-process.yaml | kubectl create -f -
echo "harness is ${{ steps.which-harness.outputs.harness_type }}"
POD=$(kubectl get pod -l app=performance-harness -o jsonpath="{.items[0].metadata.name}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ dependencies {
implementation 'org.apache.commons:commons-lang3:3.11'
implementation 'io.airbyte:airbyte-commons-worker:0.42.0'
implementation 'io.airbyte.airbyte-config:config-models:0.42.0'
implementation 'com.datadoghq:datadog-api-client:2.16.0'
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static void main(final String[] args) {
try {
final PerformanceTest test = new PerformanceTest(
image,
dataset,
config.toString(),
catalog.toString());
test.runTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@

package io.airbyte.integrations.source_performance;

import com.datadog.api.client.ApiClient;
import com.datadog.api.client.ApiException;
import com.datadog.api.client.v2.api.MetricsApi;
import com.datadog.api.client.v2.model.IntakePayloadAccepted;
import com.datadog.api.client.v2.model.MetricIntakeType;
import com.datadog.api.client.v2.model.MetricPayload;
import com.datadog.api.client.v2.model.MetricPoint;
import com.datadog.api.client.v2.model.MetricResource;
import com.datadog.api.client.v2.model.MetricSeries;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -26,6 +35,8 @@
import java.net.InetAddress;
import java.nio.file.Path;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -43,17 +54,24 @@ public class PerformanceTest {

public static final double MEGABYTE = Math.pow(1024, 2);
private final String imageName;
private final String dataset;
private final JsonNode config;
private final ConfiguredAirbyteCatalog catalog;

PerformanceTest(final String imageName, final String config, final String catalog) throws JsonProcessingException {
PerformanceTest(final String imageName, final String dataset, final String config, final String catalog) throws JsonProcessingException {
final ObjectMapper mapper = new ObjectMapper();
this.imageName = imageName;
this.dataset = dataset;
this.config = mapper.readTree(config);
this.catalog = Jsons.deserialize(catalog, ConfiguredAirbyteCatalog.class);
}

void runTest() throws Exception {

// Initialize datadog.
ApiClient defaultClient = ApiClient.getDefaultApiClient();
MetricsApi apiInstance = new MetricsApi(defaultClient);

KubePortManagerSingleton.init(PORTS);

final KubernetesClient fabricClient = new DefaultKubernetesClient();
Expand Down Expand Up @@ -105,8 +123,44 @@ void runTest() throws Exception {
final var totalMB = totalBytes / MEGABYTE;
final var totalTimeSecs = (end - start) / 1000.0;
final var rps = counter / totalTimeSecs;
log.info("total secs: {}. total MB read: {}, rps: {}, throughput: {}", totalTimeSecs, totalMB, rps, totalMB / totalTimeSecs);
final var throughput = totalMB / totalTimeSecs;
log.info("total secs: {}. total MB read: {}, rps: {}, throughput: {}", totalTimeSecs, totalMB, rps, throughput);
source.close();

final long reportingTimeInEpochSeconds = OffsetDateTime.now().toInstant().getEpochSecond();

List<MetricResource> metricResources = List.of(
new MetricResource().name("github").type("runner"),
new MetricResource().name(imageName).type("image"),
new MetricResource().name(dataset).type("dataset"));
MetricPayload body =
new MetricPayload()
.series(
List.of(
new MetricSeries()
.metric("connectors.performance.rps")
.type(MetricIntakeType.GAUGE)
.points(
Collections.singletonList(
new MetricPoint()
.timestamp(reportingTimeInEpochSeconds)
.value(rps)))
.resources(metricResources),
new MetricSeries()
.metric("connectors.performance.throughput")
.type(MetricIntakeType.GAUGE)
.points(
Collections.singletonList(
new MetricPoint()
.timestamp(reportingTimeInEpochSeconds)
.value(throughput)))
.resources(metricResources)));
try {
IntakePayloadAccepted result = apiInstance.submitMetrics(body);
System.out.println(result);
} catch (ApiException e) {
log.error("Exception when calling MetricsApi#submitMetrics.", e);
}
}

private static <V0, V1> V0 convertProtocolObject(final V1 v1, final Class<V0> klass) {
Expand Down
5 changes: 5 additions & 0 deletions tools/bin/run-harness-process.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ spec:
requests:
cpu: "2.5"
memory: "2Gi"
env:
- name: DD_API_KEY
value: $DD_API_KEY
- name: DD_SITE
value: "datadoghq.com"
volumes:
- name: secrets-volume
hostPath:
Expand Down

0 comments on commit 62bb25b

Please sign in to comment.