Skip to content

Commit

Permalink
[INLONG-11349][Sort] Added ut for opentelemetryAppender
Browse files Browse the repository at this point in the history
  • Loading branch information
qy-liuhuo committed Nov 18, 2024
1 parent 7ebbd8b commit 60e418d
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.MySqlContainer;
import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer;
import org.apache.inlong.sort.tests.utils.PlaceholderResolver;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;
Expand All @@ -34,6 +35,7 @@
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -111,6 +113,15 @@ public class Kafka2StarRocksTest extends FlinkContainerTestEnvJRE8 {
.withNetworkAliases("mysql")
.withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG));

@ClassRule
public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER =
(OpenTelemetryContainer) new OpenTelemetryContainer()
.withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"),
"/otel-config.yaml")
.withCommand("--config=/otel-config.yaml")
.withNetwork(NETWORK)
.withNetworkAliases("logcollector");

@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
Expand Down Expand Up @@ -152,6 +163,10 @@ public static void teardown() {
if (STAR_ROCKS != null) {
STAR_ROCKS.stop();
}

if (OPEN_TELEMETRY_CONTAINER != null) {
OPEN_TELEMETRY_CONTAINER.stop();
}
}

private void initializeKafkaTable(String topic) {
Expand Down Expand Up @@ -223,5 +238,10 @@ public void testKafkaWithSqlFile() throws Exception {
"test_output1",
3,
60000L);
// check log appender
String logs = OPEN_TELEMETRY_CONTAINER.getLogs();
if (!logs.contains("OpenTelemetryLogger installed")) {
throw new Exception("Failure to append logs to OpenTelemetry");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.MySqlContainer;
import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;

Expand All @@ -30,6 +31,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.MountableFile;

import java.net.URISyntaxException;
import java.nio.file.Path;
Expand Down Expand Up @@ -85,6 +87,15 @@ public class Mysql2StarRocksTest extends FlinkContainerTestEnvJRE8 {
.withNetworkAliases("mysql")
.withLogConsumer(new Slf4jLogConsumer(LOG));

@ClassRule
public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER =
(OpenTelemetryContainer) new OpenTelemetryContainer()
.withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"),
"/otel-config.yaml")
.withCommand("--config=/otel-config.yaml")
.withNetwork(NETWORK)
.withNetworkAliases("logcollector");

@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
Expand Down Expand Up @@ -121,6 +132,9 @@ public static void teardown() {
if (STAR_ROCKS != null) {
STAR_ROCKS.stop();
}
if (OPEN_TELEMETRY_CONTAINER != null) {
OPEN_TELEMETRY_CONTAINER.stop();
}
}

/**
Expand Down Expand Up @@ -161,6 +175,11 @@ public void testMysqlUpdateAndDelete() throws Exception {
expectResult,
"test_output1",
3,
60000L);
80000L);
// check log appender
String logs = OPEN_TELEMETRY_CONTAINER.getLogs();
if (!logs.contains("OpenTelemetryLogger installed")) {
throw new Exception("Failure to append logs to OpenTelemetry");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;

Expand All @@ -35,6 +36,7 @@
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -91,6 +93,15 @@ public class Pulsar2StarRocksTest extends FlinkContainerTestEnvJRE8 {
.withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));

@ClassRule
public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER =
(OpenTelemetryContainer) new OpenTelemetryContainer()
.withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"),
"/otel-config.yaml")
.withCommand("--config=/otel-config.yaml")
.withNetwork(NETWORK)
.withNetworkAliases("logcollector");

@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
Expand Down Expand Up @@ -119,6 +130,9 @@ public static void teardown() {
if (STAR_ROCKS != null) {
STAR_ROCKS.stop();
}
if (OPEN_TELEMETRY_CONTAINER != null) {
OPEN_TELEMETRY_CONTAINER.stop();
}
}

@Test
Expand All @@ -144,6 +158,11 @@ public void testPulsarToStarRocks() throws Exception {
"1,Alice,Hello, Pulsar",
"2,Bob,Goodbye, Pulsar");
proxy.checkResultWithTimeout(expectedResult, "test_output1", 3, 60000L);
// check log appender
String logs = OPEN_TELEMETRY_CONTAINER.getLogs();
if (!logs.contains("OpenTelemetryLogger installed")) {
throw new Exception("Failure to append logs to OpenTelemetry");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static void before() {
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withExposedPorts(DEBUG_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withEnv("OTEL_EXPORTER_ENDPOINT", "logcollector:4317")
.dependsOn(jobManager)
.withLogConsumer(new Slf4jLogConsumer(TM_LOG));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.tests.utils;

import org.testcontainers.containers.GenericContainer;

public class OpenTelemetryContainer extends GenericContainer {

public static final String IMAGE = "otel/opentelemetry-collector-contrib:0.110.0";
public static final Integer PORT = 4317;
public OpenTelemetryContainer() {
super(IMAGE);
addExposedPort(PORT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

receivers:
otlp:
protocols:
grpc:
endpoint: logcollector:4317
processors:
batch:

exporters:
debug:
verbosity: detailed

service:
pipelines:
logs:
receivers: [otlp]
processors: [batch]
exporters: [debug]
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CREATE TABLE test_input1 (
'password' = 'inlong',
'database-name' = 'test',
'table-name' = 'test_input1',
'scan.incremental.snapshot.enabled' = 'false',
'scan.incremental.snapshot.enabled' = 'true',
'jdbc.properties.useSSL' = 'false',
'jdbc.properties.allowPublicKeyRetrieval' = 'true'
);
Expand Down

0 comments on commit 60e418d

Please sign in to comment.