diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java index 981dbc0847f..53dc27a673b 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java @@ -27,7 +27,6 @@ import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser; import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; @@ -218,16 +217,7 @@ private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) { TableSchema.builder() .column( PhysicalColumn.of( - "content", - new SeaTunnelRowType( - new String[] {"content"}, - new SeaTunnelDataType[] { - BasicType.STRING_TYPE - }), - 0, - false, - null, - null)) + "content", BasicType.STRING_TYPE, 0, false, null, null)) .build(); } return CatalogTable.of( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 595fe3042e2..ebe1ecb45e6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -231,6 +231,26 @@ public void testSourceKafkaTextToConsole(TestContainer container) Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } + @TestTemplate + public void testTextFormatWithNoSchema(TestContainer container) + throws IOException, InterruptedException { + try { + for (int i = 0; i < 100; i++) { + ProducerRecord producerRecord = + new ProducerRecord<>( + "test_topic_text_no_schema", null, "abcdef".getBytes()); + producer.send(producerRecord).get(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + producer.flush(); + } + Container.ExecResult execResult = + container.executeJob("/textFormatIT/kafka_source_text_with_no_schema.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + @TestTemplate public void testSourceKafkaToAssertWithMaxPollRecords1(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_no_schema.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_no_schema.conf new file mode 100644 index 00000000000..bcde96c84ec --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_no_schema.conf @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9092" + topic = "test_topic_text_no_schema" + plugin_output = "kafka_table" + start_mode = "earliest" + format_error_handle_way = fail + format = text + } +} + +sink { + Assert { + plugin_input = "kafka_table" + rules = + { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ], + field_rules = [ + { + field_name = "content" + field_type = "string" + field_value = [ + {equals_to = "abcdef"} + ] + } + ] + } + } +} \ No newline at end of file