diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java index 70e6b2413ed..d6bcca2fa1a 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java @@ -20,6 +20,7 @@ import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; import org.apache.inlong.sort.tests.utils.JdbcProxy; import org.apache.inlong.sort.tests.utils.MySqlContainer; +import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer; import org.apache.inlong.sort.tests.utils.PlaceholderResolver; import org.apache.inlong.sort.tests.utils.StarRocksContainer; import org.apache.inlong.sort.tests.utils.TestUtils; @@ -34,6 +35,7 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; import java.io.IOException; import java.net.URI; @@ -111,6 +113,15 @@ public class Kafka2StarRocksTest extends FlinkContainerTestEnvJRE8 { .withNetworkAliases("mysql") .withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG)); + @ClassRule + public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER = + (OpenTelemetryContainer) new OpenTelemetryContainer() + .withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"), + "/otel-config.yaml") + .withCommand("--config=/otel-config.yaml") + .withNetwork(NETWORK) + .withNetworkAliases("logcollector"); + @Before public void setup() { waitUntilJobRunning(Duration.ofSeconds(30)); @@ -152,6 +163,10 @@ public static void teardown() { if (STAR_ROCKS != null) { STAR_ROCKS.stop(); } + + if (OPEN_TELEMETRY_CONTAINER != null) { + OPEN_TELEMETRY_CONTAINER.stop(); + } } private void initializeKafkaTable(String topic) { @@ -223,5 +238,10 @@ public void testKafkaWithSqlFile() throws Exception { "test_output1", 3, 60000L); + // check log appender + String logs = OPEN_TELEMETRY_CONTAINER.getLogs(); + if (!logs.contains("OpenTelemetryLogger installed")) { + throw new Exception("Failure to append logs to OpenTelemetry"); + } } } \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java index 6b7a5aa644e..e2867d7f9d2 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java @@ -20,6 +20,7 @@ import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; import org.apache.inlong.sort.tests.utils.JdbcProxy; import org.apache.inlong.sort.tests.utils.MySqlContainer; +import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer; import org.apache.inlong.sort.tests.utils.StarRocksContainer; import org.apache.inlong.sort.tests.utils.TestUtils; @@ -30,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.MountableFile; import java.net.URISyntaxException; import java.nio.file.Path; @@ -85,6 +87,15 @@ public class Mysql2StarRocksTest extends FlinkContainerTestEnvJRE8 { .withNetworkAliases("mysql") .withLogConsumer(new Slf4jLogConsumer(LOG)); + @ClassRule + public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER = + (OpenTelemetryContainer) new OpenTelemetryContainer() + .withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"), + "/otel-config.yaml") + .withCommand("--config=/otel-config.yaml") + .withNetwork(NETWORK) + .withNetworkAliases("logcollector"); + @Before public void setup() { waitUntilJobRunning(Duration.ofSeconds(30)); @@ -121,6 +132,9 @@ public static void teardown() { if (STAR_ROCKS != null) { STAR_ROCKS.stop(); } + if (OPEN_TELEMETRY_CONTAINER != null) { + OPEN_TELEMETRY_CONTAINER.stop(); + } } /** @@ -161,6 +175,11 @@ public void testMysqlUpdateAndDelete() throws Exception { expectResult, "test_output1", 3, - 60000L); + 80000L); + // check log appender + String logs = OPEN_TELEMETRY_CONTAINER.getLogs(); + if (!logs.contains("OpenTelemetryLogger installed")) { + throw new Exception("Failure to append logs to OpenTelemetry"); + } } } \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java index e5252d0b4a2..9c40577778c 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java @@ -19,6 +19,7 @@ import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; import org.apache.inlong.sort.tests.utils.JdbcProxy; +import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer; import org.apache.inlong.sort.tests.utils.StarRocksContainer; import org.apache.inlong.sort.tests.utils.TestUtils; @@ -35,6 +36,7 @@ import org.testcontainers.containers.PulsarContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; import java.net.URI; import java.net.URISyntaxException; @@ -91,6 +93,15 @@ public class Pulsar2StarRocksTest extends FlinkContainerTestEnvJRE8 { .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS) .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG)); + @ClassRule + public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER = + (OpenTelemetryContainer) new OpenTelemetryContainer() + .withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"), + "/otel-config.yaml") + .withCommand("--config=/otel-config.yaml") + .withNetwork(NETWORK) + .withNetworkAliases("logcollector"); + @Before public void setup() { waitUntilJobRunning(Duration.ofSeconds(30)); @@ -119,6 +130,9 @@ public static void teardown() { if (STAR_ROCKS != null) { STAR_ROCKS.stop(); } + if (OPEN_TELEMETRY_CONTAINER != null) { + OPEN_TELEMETRY_CONTAINER.stop(); + } } @Test @@ -144,6 +158,11 @@ public void testPulsarToStarRocks() throws Exception { "1,Alice,Hello, Pulsar", "2,Bob,Goodbye, Pulsar"); proxy.checkResultWithTimeout(expectedResult, "test_output1", 3, 60000L); + // check log appender + String logs = OPEN_TELEMETRY_CONTAINER.getLogs(); + if (!logs.contains("OpenTelemetryLogger installed")) { + throw new Exception("Failure to append logs to OpenTelemetry"); + } } } diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java index a59d9c9e982..83c14a113b3 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java @@ -45,6 +45,7 @@ public static void before() { .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) .withExposedPorts(DEBUG_PORT) .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withEnv("OTEL_EXPORTER_ENDPOINT", "logcollector:4317") .dependsOn(jobManager) .withLogConsumer(new Slf4jLogConsumer(TM_LOG)); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/OpenTelemetryContainer.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/OpenTelemetryContainer.java new file mode 100644 index 00000000000..0d07e85a9e8 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/OpenTelemetryContainer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.testcontainers.containers.GenericContainer; + +public class OpenTelemetryContainer extends GenericContainer { + + public static final String IMAGE = "otel/opentelemetry-collector-contrib:0.110.0"; + public static final Integer PORT = 4317; + public OpenTelemetryContainer() { + super(IMAGE); + addExposedPort(PORT); + } +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/otel-config.yaml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/otel-config.yaml new file mode 100644 index 00000000000..ad48b9cb07e --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/otel-config.yaml @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +receivers: + otlp: + protocols: + grpc: + endpoint: logcollector:4317 +processors: + batch: + +exporters: + debug: + verbosity: detailed + +service: + pipelines: + logs: + receivers: [otlp] + processors: [batch] + exporters: [debug] \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql index 9f74d54ae75..6c4a3efb84c 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql @@ -10,7 +10,7 @@ CREATE TABLE test_input1 ( 'password' = 'inlong', 'database-name' = 'test', 'table-name' = 'test_input1', - 'scan.incremental.snapshot.enabled' = 'false', + 'scan.incremental.snapshot.enabled' = 'true', 'jdbc.properties.useSSL' = 'false', 'jdbc.properties.allowPublicKeyRetrieval' = 'true' );