diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index be86cd37df91..f314f3eea0c8 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -155,7 +155,7 @@ object AvroConversionUtils { try { val schemaConverters = sparkAdapter.getAvroSchemaConverters schemaConverters.toSqlType(avroSchema) match { - case (dataType, _) => dataType.asInstanceOf[StructType] + case (dataType, _, _) => dataType.asInstanceOf[StructType] } } catch { case e: Exception => throw new HoodieSchemaException("Failed to convert avro schema to struct type: " + avroSchema, e) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSchemaConverters.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSchemaConverters.scala index 9b068afac83d..14dc5ed698ef 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSchemaConverters.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSchemaConverters.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.DataType */ trait HoodieAvroSchemaConverters { - def toSqlType(avroSchema: Schema): (DataType, Boolean) + def toSqlType(avroSchema: Schema): (DataType, Boolean, Option[String]) def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String = ""): Schema diff --git a/hudi-common/src/test/resources/simple-test-evolved.avsc b/hudi-common/src/test/resources/simple-test-evolved.avsc index faff2de58c00..622abfa460b9 100644 --- a/hudi-common/src/test/resources/simple-test-evolved.avsc +++ b/hudi-common/src/test/resources/simple-test-evolved.avsc @@ -24,7 +24,7 @@ {"name": "field2", "type": ["null", "string"], "default": null}, {"name": "name", "type": ["null", "string"], "default": null}, {"name": "favorite_number", "type": ["null", "long"], "default": null}, - {"name": "favorite_color", "type": ["null", "string"], "default": null}, + {"name": "favorite_color", "type": ["null", "string"], "default": null, "doc": "a quoted\"comment"}, {"name": "favorite_movie", "type": ["null", "string"], "default": null} ] } diff --git a/hudi-common/src/test/resources/simple-test.avsc b/hudi-common/src/test/resources/simple-test.avsc index 1688e2749798..cf206c29af80 100644 --- a/hudi-common/src/test/resources/simple-test.avsc +++ b/hudi-common/src/test/resources/simple-test.avsc @@ -22,6 +22,6 @@ "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": "int"}, - {"name": "favorite_color", "type": "string"} + {"name": "favorite_color", "type": "string", "doc": "a quoted\"comment"} ] } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java index 6e327bdc6120..07c623782d21 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java @@ -184,7 +184,9 @@ public static Map translateFlinkTableProperties2Spark( partitionKeys, sparkVersion, 4000, - messageType); + messageType, + // flink does not support comment yet + Arrays.asList()); properties.putAll(sparkTableProperties); return properties.entrySet().stream() .filter(e -> KEY_MAPPING.containsKey(e.getKey()) && !catalogTable.getOptions().containsKey(KEY_MAPPING.get(e.getKey()))) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala index 65306ac44686..2c6a2c86de46 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala @@ -28,9 +28,9 @@ import org.apache.spark.sql.types.DataType */ object HoodieSparkAvroSchemaConverters extends HoodieAvroSchemaConverters { - override def toSqlType(avroSchema: Schema): (DataType, Boolean) = + override def toSqlType(avroSchema: Schema): (DataType, Boolean, Option[String]) = SchemaConverters.toSqlType(avroSchema) match { - case SchemaType(dataType, nullable) => (dataType, nullable) + case SchemaType(dataType, nullable, doc) => (dataType, nullable, doc) } override def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): Schema = diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index b4e09f6d1f65..b0be65d19ff2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -45,7 +45,7 @@ private[sql] object SchemaConverters { * * @since 2.4.0 */ - case class SchemaType(dataType: DataType, nullable: Boolean) + case class SchemaType(dataType: DataType, nullable: Boolean, doc: Option[String]) /** * Converts an Avro schema to a corresponding Spark SQL schema. @@ -59,32 +59,32 @@ private[sql] object SchemaConverters { private val unionFieldMemberPrefix = "member" private def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = { - avroSchema.getType match { - case INT => avroSchema.getLogicalType match { - case _: Date => SchemaType(DateType, nullable = false) - case _ => SchemaType(IntegerType, nullable = false) + (avroSchema.getType, Option(avroSchema.getDoc)) match { + case (INT, doc) => avroSchema.getLogicalType match { + case _: Date => SchemaType(DateType, nullable = false, doc) + case _ => SchemaType(IntegerType, nullable = false, doc) } - case STRING => SchemaType(StringType, nullable = false) - case BOOLEAN => SchemaType(BooleanType, nullable = false) - case BYTES | FIXED => avroSchema.getLogicalType match { + case (STRING, doc) => SchemaType(StringType, nullable = false, doc) + case (BOOLEAN, doc) => SchemaType(BooleanType, nullable = false, doc) + case (BYTES | FIXED, doc) => avroSchema.getLogicalType match { // For FIXED type, if the precision requires more bytes than fixed size, the logical // type will be null, which is handled by Avro library. - case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false) - case _ => SchemaType(BinaryType, nullable = false) + case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false, doc) + case _ => SchemaType(BinaryType, nullable = false, doc) } - case DOUBLE => SchemaType(DoubleType, nullable = false) - case FLOAT => SchemaType(FloatType, nullable = false) - case LONG => avroSchema.getLogicalType match { - case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false) - case _ => SchemaType(LongType, nullable = false) + case (DOUBLE, doc) => SchemaType(DoubleType, nullable = false, doc) + case (FLOAT, doc) => SchemaType(FloatType, nullable = false, doc) + case (LONG, doc) => avroSchema.getLogicalType match { + case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false, doc) + case _ => SchemaType(LongType, nullable = false, doc) } - case ENUM => SchemaType(StringType, nullable = false) + case (ENUM, doc) => SchemaType(StringType, nullable = false, doc) - case NULL => SchemaType(NullType, nullable = true) + case (NULL, doc) => SchemaType(NullType, nullable = true, doc) - case RECORD => + case (RECORD, doc) => if (existingRecordNames.contains(avroSchema.getFullName)) { throw new IncompatibleSchemaException( s""" @@ -95,24 +95,25 @@ private[sql] object SchemaConverters { val newRecordNames = existingRecordNames + avroSchema.getFullName val fields = avroSchema.getFields.asScala.map { f => val schemaType = toSqlTypeHelper(f.schema(), newRecordNames) - StructField(f.name, schemaType.dataType, schemaType.nullable) + val metadata = if(f.doc != null) new MetadataBuilder().putString("comment", f.doc).build() else Metadata.empty + StructField(f.name, schemaType.dataType, schemaType.nullable, metadata) } - SchemaType(StructType(fields.toSeq), nullable = false) + SchemaType(StructType(fields.toSeq), nullable = false, doc) - case ARRAY => + case (ARRAY, doc) => val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames) SchemaType( ArrayType(schemaType.dataType, containsNull = schemaType.nullable), - nullable = false) + nullable = false, doc) - case MAP => + case (MAP, doc) => val schemaType = toSqlTypeHelper(avroSchema.getValueType, existingRecordNames) SchemaType( MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable), - nullable = false) + nullable = false, doc) - case UNION => + case (UNION, doc) => if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { // In case of a union with null, eliminate it and make a recursive call val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL) @@ -126,9 +127,9 @@ private[sql] object SchemaConverters { case Seq(t1) => toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames) case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) => - SchemaType(LongType, nullable = false) + SchemaType(LongType, nullable = false, doc) case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) => - SchemaType(DoubleType, nullable = false) + SchemaType(DoubleType, nullable = false, doc) case _ => // Convert complex unions to struct types where field names are member0, member1, etc. // This is consistent with the behavior when converting between Avro and Parquet. @@ -136,10 +137,11 @@ private[sql] object SchemaConverters { case (s, i) => val schemaType = toSqlTypeHelper(s, existingRecordNames) // All fields are nullable because only one of them is set at a time - StructField(s"$unionFieldMemberPrefix$i", schemaType.dataType, nullable = true) + val metadata = if(schemaType.doc.isDefined) new MetadataBuilder().putString("comment", schemaType.doc.get).build() else Metadata.empty + StructField(s"$unionFieldMemberPrefix$i", schemaType.dataType, nullable = true, metadata) } - SchemaType(StructType(fields.toSeq), nullable = false) + SchemaType(StructType(fields.toSeq), nullable = false, doc) } case other => throw new IncompatibleSchemaException(s"Unsupported type $other") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnComments.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnComments.scala new file mode 100644 index 000000000000..878293e4fe3d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnComments.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.spark.sql._ +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.SparkContext +import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.common.model.{HoodieTableType} +import org.apache.spark.sql.types.StructType +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource + + +class TestColumnComments { + var spark : SparkSession = _ + var sqlContext: SQLContext = _ + var sc : SparkContext = _ + + def initSparkContext(): Unit = { + val sparkConf = getSparkConfForTest(getClass.getSimpleName) + spark = SparkSession.builder() + .withExtensions(new HoodieSparkSessionExtension) + .config(sparkConf) + .getOrCreate() + sc = spark.sparkContext + sc.setLogLevel("ERROR") + sqlContext = spark.sqlContext + } + + @BeforeEach + def setUp() { + initSparkContext() + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType], names = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testColumnCommentWithSparkDatasource(tableType: HoodieTableType): Unit = { + val basePath = java.nio.file.Files.createTempDirectory("hoodie_comments_path").toAbsolutePath.toString + val opts = Map( + HoodieWriteConfig.TBL_NAME.key -> "hoodie_comments", + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.toString, + DataSourceWriteOptions.OPERATION.key -> "bulk_insert", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition" + ) + val inputDF = spark.sql("select '0' as _row_key, '1' as content, '2' as partition, '3' as ts") + val struct = new StructType() + .add("_row_key", "string", true, "dummy comment") + .add("content", "string", true) + .add("partition", "string", true) + .add("ts", "string", true) + spark.createDataFrame(inputDF.rdd, struct) + .write.format("hudi") + .options(opts) + .mode(SaveMode.Overwrite) + .save(basePath) + spark.read.format("hudi").load(basePath).registerTempTable("test_tbl") + + // now confirm the comment is present at read time + assertEquals(1, spark.sql("desc extended test_tbl") + .filter("col_name = '_row_key' and comment = 'dummy comment'").count) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala index bf68e34fb648..65de50bb1e0f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala @@ -49,7 +49,7 @@ class TestAvroSerDe extends SparkAdapterSupport { } val avroSchema = HoodieMetadataColumnStats.SCHEMA$ - val SchemaType(catalystSchema, _) = SchemaConverters.toSqlType(avroSchema) + val SchemaType(catalystSchema, _, _) = SchemaConverters.toSqlType(avroSchema) val deserializer = sparkAdapter.createAvroDeserializer(avroSchema, catalystSchema) val serializer = sparkAdapter.createAvroSerializer(catalystSchema, avroSchema, nullable = false) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala index 3d172fecdf62..0c2294d54f60 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala @@ -28,7 +28,7 @@ class TestSchemaConverters { def testAvroUnionConversion(): Unit = { val originalAvroSchema = HoodieMetadataColumnStats.SCHEMA$ - val SchemaType(convertedStructType, _) = SchemaConverters.toSqlType(originalAvroSchema) + val SchemaType(convertedStructType, _, _) = SchemaConverters.toSqlType(originalAvroSchema) val convertedAvroSchema = SchemaConverters.toAvroType(convertedStructType) // NOTE: Here we're validating that converting Avro -> Catalyst and Catalyst -> Avro are inverse diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java index 2fa6a9cbe53f..f174f23b14c7 100644 --- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java @@ -24,6 +24,7 @@ import org.apache.hudi.hive.SchemaDifference; import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.sync.common.HoodieSyncTool; +import org.apache.hudi.sync.common.model.FieldSchema; import org.apache.hudi.sync.common.model.PartitionEvent; import org.apache.hudi.sync.common.model.PartitionEvent.PartitionEventType; import org.apache.hudi.sync.common.util.ConfigUtils; @@ -212,8 +213,9 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi Map tableProperties = ConfigUtils.toMap(config.getString(ADB_SYNC_TABLE_PROPERTIES)); Map serdeProperties = ConfigUtils.toMap(config.getString(ADB_SYNC_SERDE_PROPERTIES)); if (config.getBoolean(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE)) { + List fromStorage = syncClient.getStorageFieldSchemas(); Map sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(config.getSplitStrings(META_SYNC_PARTITION_FIELDS), - config.getString(META_SYNC_SPARK_VERSION), config.getInt(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema); + config.getString(META_SYNC_SPARK_VERSION), config.getInt(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema, fromStorage); Map sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, config.getString(META_SYNC_BASE_PATH)); tableProperties.putAll(sparkTableProperties); serdeProperties.putAll(sparkSerdeProperties); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java index 74cb90de0209..c1a83c725829 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java @@ -105,7 +105,9 @@ public class HiveSyncConfigHolder { .key("hoodie.datasource.hive_sync.sync_as_datasource") .defaultValue("true") .markAdvanced() - .withDocumentation(""); + .withDocumentation("Add information to setup the spark datasource, including tables properties and spark schema." + + " This allow spark to use optimized reader." + + " Column comments are also added for the first level only."); public static final ConfigProperty HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD = ConfigProperty .key("hoodie.datasource.hive_sync.schema_string_length_thresh") .defaultValue(4000) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 65a2262a2db0..1bd1e7f2acfb 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -298,8 +298,9 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, private Map getTableProperties(MessageType schema) { Map tableProperties = ConfigUtils.toMap(config.getString(HIVE_TABLE_PROPERTIES)); if (config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) { + List fromStorage = syncClient.getStorageFieldSchemas(); Map sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(config.getSplitStrings(META_SYNC_PARTITION_FIELDS), - config.getStringOrDefault(META_SYNC_SPARK_VERSION), config.getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema); + config.getStringOrDefault(META_SYNC_SPARK_VERSION), config.getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema, fromStorage); tableProperties.putAll(sparkTableProperties); } return tableProperties; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 6f20d27d20b0..5fad890f4b7e 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -90,7 +90,7 @@ private List updateHiveSQLs(List sqls) { for (String sql : sqls) { if (hiveDriver != null) { HoodieTimer timer = HoodieTimer.start(); - responses.add(hiveDriver.run(sql)); + responses.add(hiveDriver.run(escapeAntiSlash(sql))); LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, timer.endTimer())); } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java index 026bf880835b..251beea30cb5 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java @@ -65,7 +65,7 @@ public void runSQL(String s) { try { stmt = connection.createStatement(); LOG.info("Executing SQL " + s); - stmt.execute(s); + stmt.execute(escapeAntiSlash(s)); } catch (SQLException e) { throw new HoodieHiveSyncException("Failed in executing SQL " + s, e); } finally { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index 1c4dcec592e7..669e5cef17a1 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE; @@ -220,5 +221,16 @@ private List constructChangePartitions(String tableName, List pa } return changePartitions; } + + /** + * SQL statement should be be escaped in order to consider anti-slash + * + * For eg: \foo should be transformed into \\\foo + * @param sql + * @return + */ + protected String escapeAntiSlash(String sql) { + return sql.replaceAll("\\\\", Matcher.quoteReplacement("\\\\\\")); + } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 377e72b0ae39..eaf2b7d546c6 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -601,7 +601,7 @@ private String getSparkTableProperties(boolean syncAsDataSourceTable, boolean us + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," + "{\"name\":\"favorite_number\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}," - + "{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," + + "{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false,\"metadata\":{\"comment\":\"a quoted\\\"comment\"}}," + "{\"name\":\"datestr\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}\n" + "spark.sql.sources.schema.partCol.0\tdatestr\n"; } else { @@ -611,7 +611,7 @@ private String getSparkTableProperties(boolean syncAsDataSourceTable, boolean us + "spark.sql.sources.schema.part.0\t{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":" + "\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"favorite_number\",\"type\":\"integer\"," + "\"nullable\":false,\"metadata\":{}},{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false," - + "\"metadata\":{}}]}\n" + + "\"metadata\":{\"comment\":\"a quoted\\\"comment\"}}]}\n" + "{\"name\":\"datestr\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}\n" + "spark.sql.sources.schema.partCol.0\tdatestr\n"; } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java index b6940629af3d..a7be85c6d9c6 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.hive; +import org.apache.hudi.sync.common.model.FieldSchema; import org.apache.hudi.sync.common.util.Parquet2SparkSchemaUtils; import org.apache.spark.sql.execution.SparkSqlParser; import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter; @@ -31,6 +32,9 @@ import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.List; + import static org.junit.jupiter.api.Assertions.assertEquals; public class TestParquet2SparkSchemaUtils { @@ -59,7 +63,7 @@ public void testConvertPrimitiveType() { + " f11 tinyint, f12 smallint, f13 binary, f14 boolean"); String sparkSchemaJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( - spark2ParquetConverter.convert(sparkSchema).asGroupType()); + spark2ParquetConverter.convert(sparkSchema).asGroupType(), Arrays.asList()); StructType convertedSparkSchema = (StructType) StructType.fromJson(sparkSchemaJson); assertEquals(sparkSchema.json(), convertedSparkSchema.json()); // Test type with nullable @@ -67,7 +71,7 @@ public void testConvertPrimitiveType() { StructField field1 = new StructField("f1", StringType$.MODULE$, true, Metadata.empty()); StructType sparkSchemaWithNullable = new StructType(new StructField[]{field0, field1}); String sparkSchemaWithNullableJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( - spark2ParquetConverter.convert(sparkSchemaWithNullable).asGroupType()); + spark2ParquetConverter.convert(sparkSchemaWithNullable).asGroupType(), Arrays.asList()); StructType convertedSparkSchemaWithNullable = (StructType) StructType.fromJson(sparkSchemaWithNullableJson); assertEquals(sparkSchemaWithNullable.json(), convertedSparkSchemaWithNullable.json()); } @@ -79,7 +83,7 @@ public void testConvertComplexType() { + ",f3 map, bigint>, f4 array>" + ",f5 struct"); String sparkSchemaJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( - spark2ParquetConverter.convert(sparkSchema).asGroupType()); + spark2ParquetConverter.convert(sparkSchema).asGroupType(), Arrays.asList()); StructType convertedSparkSchema = (StructType) StructType.fromJson(sparkSchemaJson); assertEquals(sparkSchema.json(), convertedSparkSchema.json()); // Test complex type with nullable @@ -87,8 +91,22 @@ public void testConvertComplexType() { StructField field1 = new StructField("f1", new MapType(StringType$.MODULE$, IntegerType$.MODULE$, true), false, Metadata.empty()); StructType sparkSchemaWithNullable = new StructType(new StructField[]{field0, field1}); String sparkSchemaWithNullableJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( - spark2ParquetConverter.convert(sparkSchemaWithNullable).asGroupType()); + spark2ParquetConverter.convert(sparkSchemaWithNullable).asGroupType(), Arrays.asList()); StructType convertedSparkSchemaWithNullable = (StructType) StructType.fromJson(sparkSchemaWithNullableJson); assertEquals(sparkSchemaWithNullable.json(), convertedSparkSchemaWithNullable.json()); } + + @Test + public void testConvertPrimitiveTypeWithComment() { + StructType sparkSchema = parser.parseTableSchema( + "f0 int COMMENT \"first comment\", f1 string COMMENT \"double quote\\\"comment\""); + List fieldSchema = Arrays.asList( + new FieldSchema("f0", "int", "first comment"), + new FieldSchema("f1", "string", "double quote\"comment")); + + String sparkSchemaJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( + spark2ParquetConverter.convert(sparkSchema).asGroupType(), fieldSchema); + StructType convertedSparkSchema = (StructType) StructType.fromJson(sparkSchemaJson); + assertEquals(sparkSchema.json(), convertedSparkSchema.json()); + } } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java index c5b98c17eb4a..1b5f67bbcbb6 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java @@ -19,33 +19,42 @@ package org.apache.hudi.sync.common.util; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.sync.common.model.FieldSchema; + import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Matcher; + import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; /** * Convert the parquet schema to spark schema' json string. * This code is refer to org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter * in spark project. + * + * Currently, due to limitations in the FieldSchema only the first level of fields get comments + * extracted, if exist. */ public class Parquet2SparkSchemaUtils { - public static String convertToSparkSchemaJson(GroupType parquetSchema) { + public static String convertToSparkSchemaJson(GroupType parquetSchema, List fieldSchema) { String fieldsJsonString = parquetSchema.getFields().stream().map(field -> { switch (field.getRepetition()) { case OPTIONAL: return "{\"name\":\"" + field.getName() + "\",\"type\":" + convertFieldType(field) - + ",\"nullable\":true,\"metadata\":{}}"; + + ",\"nullable\":true,\"metadata\":{" + getComment(field.getName(), fieldSchema) + "}}"; case REQUIRED: return "{\"name\":\"" + field.getName() + "\",\"type\":" + convertFieldType(field) - + ",\"nullable\":false,\"metadata\":{}}"; + + ",\"nullable\":false,\"metadata\":{" + getComment(field.getName(), fieldSchema) + "}}"; case REPEATED: String arrayType = arrayType(field, false); return "{\"name\":\"" + field.getName() + "\",\"type\":" + arrayType - + ",\"nullable\":false,\"metadata\":{}}"; + + ",\"nullable\":false,\"metadata\":{" + getComment(field.getName(), fieldSchema) + "}}"; default: throw new UnsupportedOperationException("Unsupport convert " + field + " to spark sql type"); } @@ -53,6 +62,18 @@ public static String convertToSparkSchemaJson(GroupType parquetSchema) { return "{\"type\":\"struct\",\"fields\":[" + fieldsJsonString + "]}"; } + private static String getComment(String name, List fromStorage) { + return fromStorage.stream() + .filter(f -> name.equals(f.getName())) + .filter(f -> f.getComment().isPresent()) + .map(f -> "\"comment\":\"" + escapeQuote(f.getComment().get()) + "\"") + .findFirst().orElse(""); + } + + private static String escapeQuote(String s) { + return s.replaceAll("\"", Matcher.quoteReplacement("\\\"")); + } + private static String convertFieldType(Type field) { if (field instanceof PrimitiveType) { return "\"" + convertPrimitiveType((PrimitiveType) field) + "\""; @@ -133,7 +154,7 @@ private static String convertPrimitiveType(PrimitiveType field) { private static String convertGroupField(GroupType field) { if (field.getOriginalType() == null) { - return convertToSparkSchemaJson(field); + return convertToSparkSchemaJson(field, Arrays.asList()); } switch (field.getOriginalType()) { case LIST: diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java index 2a9f783a43bc..0454ff9a6713 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.sync.common.util; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.sync.common.model.FieldSchema; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; @@ -36,11 +37,13 @@ public class SparkDataSourceTableUtils { /** * Get Spark Sql related table properties. This is used for spark datasource table. - * @param schema The schema to write to the table. + * + * @param schema The schema to write to the table. + * @param fieldSchemaForComments * @return A new parameters added the spark's table properties. */ public static Map getSparkTableProperties(List partitionNames, String sparkVersion, - int schemaLengthThreshold, MessageType schema) { + int schemaLengthThreshold, MessageType schema, List fieldSchemaForComments) { // Convert the schema and partition info used by spark sql to hive table properties. // The following code refers to the spark code in // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -77,7 +80,7 @@ public static Map getSparkTableProperties(List partition sparkProperties.put("spark.sql.create.version", sparkVersion); } // Split the schema string to multi-parts according the schemaLengthThreshold size. - String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType); + String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType, fieldSchemaForComments); int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold; sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart)); // Add each part of schema string to sparkProperties