Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5533] Support spark columns comments #8683

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ object AvroConversionUtils {
def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
val schemaConverters = sparkAdapter.getAvroSchemaConverters
schemaConverters.toSqlType(avroSchema) match {
case (dataType, _) => dataType.asInstanceOf[StructType]
case (dataType, _, _) => dataType.asInstanceOf[StructType]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.DataType
*/
trait HoodieAvroSchemaConverters {

def toSqlType(avroSchema: Schema): (DataType, Boolean)
def toSqlType(avroSchema: Schema): (DataType, Boolean, Option[String])

def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String = ""): Schema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import org.apache.spark.sql.types.DataType
*/
object HoodieSparkAvroSchemaConverters extends HoodieAvroSchemaConverters {

override def toSqlType(avroSchema: Schema): (DataType, Boolean) =
override def toSqlType(avroSchema: Schema): (DataType, Boolean, Option[String]) =
SchemaConverters.toSqlType(avroSchema) match {
case SchemaType(dataType, nullable) => (dataType, nullable)
case SchemaType(dataType, nullable, doc) => (dataType, nullable, doc)
}

override def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): Schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[sql] object SchemaConverters {
*
* @since 2.4.0
*/
case class SchemaType(dataType: DataType, nullable: Boolean)
case class SchemaType(dataType: DataType, nullable: Boolean, doc: Option[String])

/**
* Converts an Avro schema to a corresponding Spark SQL schema.
Expand All @@ -59,32 +59,32 @@ private[sql] object SchemaConverters {
private val unionFieldMemberPrefix = "member"

private def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = {
avroSchema.getType match {
case INT => avroSchema.getLogicalType match {
case _: Date => SchemaType(DateType, nullable = false)
case _ => SchemaType(IntegerType, nullable = false)
(avroSchema.getType, Option(avroSchema.getDoc)) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion tool is copied from Spark: https://github.com/apache/spark/blob/dd4db21cb69a9a9c3715360673a76e6f150303d4/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L58, just noticed that Spark also does not support keeping comments from Avro fields while doing the converison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely spark also have this limitation when retrieving schema from avro. But spark don't usually infer spark schema from avro. Hudi does, and that's the reason of the patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a test case for it, especially for creating table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 added a test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of feel there is no need to change each match for every data types, can we write another method similiar with toSqlTypeHelper which invokes toSqlTypeHelper firstly then fix the comment separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of feel there is no need to change each match for every data types

Any columns including nested columns also may have comments, so I don't see why we should'nt look after all avro content for doc.

which invokes toSqlTypeHelper firstly then fix the comment separately.

This would lead to walk thought the avro schema two times, and also lead to complex merge of results. Am i missing something ?

case (INT, doc) => avroSchema.getLogicalType match {
case _: Date => SchemaType(DateType, nullable = false, doc)
case _ => SchemaType(IntegerType, nullable = false, doc)
}
case STRING => SchemaType(StringType, nullable = false)
case BOOLEAN => SchemaType(BooleanType, nullable = false)
case BYTES | FIXED => avroSchema.getLogicalType match {
case (STRING, doc) => SchemaType(StringType, nullable = false, doc)
case (BOOLEAN, doc) => SchemaType(BooleanType, nullable = false, doc)
case (BYTES | FIXED, doc) => avroSchema.getLogicalType match {
// For FIXED type, if the precision requires more bytes than fixed size, the logical
// type will be null, which is handled by Avro library.
case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false)
case _ => SchemaType(BinaryType, nullable = false)
case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false, doc)
case _ => SchemaType(BinaryType, nullable = false, doc)
}

case DOUBLE => SchemaType(DoubleType, nullable = false)
case FLOAT => SchemaType(FloatType, nullable = false)
case LONG => avroSchema.getLogicalType match {
case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false)
case _ => SchemaType(LongType, nullable = false)
case (DOUBLE, doc) => SchemaType(DoubleType, nullable = false, doc)
case (FLOAT, doc) => SchemaType(FloatType, nullable = false, doc)
case (LONG, doc) => avroSchema.getLogicalType match {
case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false, doc)
case _ => SchemaType(LongType, nullable = false, doc)
}

case ENUM => SchemaType(StringType, nullable = false)
case (ENUM, doc) => SchemaType(StringType, nullable = false, doc)

case NULL => SchemaType(NullType, nullable = true)
case (NULL, doc) => SchemaType(NullType, nullable = true, doc)

case RECORD =>
case (RECORD, doc) =>
if (existingRecordNames.contains(avroSchema.getFullName)) {
throw new IncompatibleSchemaException(
s"""
Expand All @@ -95,24 +95,25 @@ private[sql] object SchemaConverters {
val newRecordNames = existingRecordNames + avroSchema.getFullName
val fields = avroSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
StructField(f.name, schemaType.dataType, schemaType.nullable)
val metadata = if(f.doc != null) new MetadataBuilder().putString("comment", f.doc).build() else Metadata.empty
StructField(f.name, schemaType.dataType, schemaType.nullable, metadata)
}

SchemaType(StructType(fields.toSeq), nullable = false)
SchemaType(StructType(fields.toSeq), nullable = false, doc)

case ARRAY =>
case (ARRAY, doc) =>
val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames)
SchemaType(
ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
nullable = false)
nullable = false, doc)

case MAP =>
case (MAP, doc) =>
val schemaType = toSqlTypeHelper(avroSchema.getValueType, existingRecordNames)
SchemaType(
MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
nullable = false)
nullable = false, doc)

case UNION =>
case (UNION, doc) =>
if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
// In case of a union with null, eliminate it and make a recursive call
val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
Expand All @@ -126,20 +127,21 @@ private[sql] object SchemaConverters {
case Seq(t1) =>
toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames)
case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
SchemaType(LongType, nullable = false)
SchemaType(LongType, nullable = false, doc)
case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
SchemaType(DoubleType, nullable = false)
SchemaType(DoubleType, nullable = false, doc)
case _ =>
// Convert complex unions to struct types where field names are member0, member1, etc.
// This is consistent with the behavior when converting between Avro and Parquet.
val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
case (s, i) =>
val schemaType = toSqlTypeHelper(s, existingRecordNames)
// All fields are nullable because only one of them is set at a time
StructField(s"$unionFieldMemberPrefix$i", schemaType.dataType, nullable = true)
val metadata = if(schemaType.doc.isDefined) new MetadataBuilder().putString("comment", schemaType.doc.get).build() else Metadata.empty
StructField(s"$unionFieldMemberPrefix$i", schemaType.dataType, nullable = true, metadata)
}

SchemaType(StructType(fields.toSeq), nullable = false)
SchemaType(StructType(fields.toSeq), nullable = false, doc)
}

case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TestAvroSerDe extends SparkAdapterSupport {
}

val avroSchema = HoodieMetadataColumnStats.SCHEMA$
val SchemaType(catalystSchema, _) = SchemaConverters.toSqlType(avroSchema)
val SchemaType(catalystSchema, _, _) = SchemaConverters.toSqlType(avroSchema)

val deserializer = sparkAdapter.createAvroDeserializer(avroSchema, catalystSchema)
val serializer = sparkAdapter.createAvroSerializer(catalystSchema, avroSchema, nullable = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class TestSchemaConverters {
def testAvroUnionConversion(): Unit = {
val originalAvroSchema = HoodieMetadataColumnStats.SCHEMA$

val SchemaType(convertedStructType, _) = SchemaConverters.toSqlType(originalAvroSchema)
val SchemaType(convertedStructType, _, _) = SchemaConverters.toSqlType(originalAvroSchema)
val convertedAvroSchema = SchemaConverters.toAvroType(convertedStructType)

// NOTE: Here we're validating that converting Avro -> Catalyst and Catalyst -> Avro are inverse
Expand Down