Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hudi] Flesh out tests and update column type support #3339

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,29 @@

package org.apache.spark.sql.delta.hudi

import org.apache.avro.Schema
import java.io.{IOException, UncheckedIOException}
import java.time.{Instant, LocalDateTime, ZoneId}
import java.time.format.{DateTimeFormatterBuilder, DateTimeParseException}
import java.time.temporal.{ChronoField, ChronoUnit}
import java.util
import java.util.{Collections, Properties}
import java.util.stream.Collectors

import scala.collection.JavaConverters._
import scala.collection.mutable._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.hudi.HudiSchemaUtils._
import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.avro.Schema
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.avro.model.HoodieActionInstant
import org.apache.hudi.avro.model.HoodieCleanFileInfo
import org.apache.hudi.avro.model.HoodieCleanerPlan
import org.apache.hudi.avro.model.HoodieCleanFileInfo
import org.apache.hudi.client.HoodieJavaWriteClient
import org.apache.hudi.client.HoodieTimelineArchiver
import org.apache.hudi.client.WriteStatus
Expand All @@ -40,9 +50,9 @@ import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieBaseFile, HoodieCl
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieInstantTimeGenerator, HoodieTimeline, TimelineMetadataUtils}
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.{MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH, SECS_INSTANT_ID_LENGTH, SECS_INSTANT_TIMESTAMP_FORMAT}
import org.apache.hudi.common.util.{Option => HudiOption}
import org.apache.hudi.common.util.CleanerUtils
import org.apache.hudi.common.util.ExternalFilePathUtil
import org.apache.hudi.common.util.{Option => HudiOption}
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.config.HoodieArchivalConfig
import org.apache.hudi.config.HoodieCleanConfig
Expand All @@ -53,16 +63,6 @@ import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY
import org.apache.hudi.table.HoodieJavaTable
import org.apache.hudi.table.action.clean.CleanPlanner

import java.io.{IOException, UncheckedIOException}
import java.time.{Instant, LocalDateTime, ZoneId}
import java.time.format.{DateTimeFormatterBuilder, DateTimeParseException}
import java.time.temporal.{ChronoField, ChronoUnit}
import java.util
import java.util.stream.Collectors
import java.util.{Collections, Properties}
import collection.mutable._
import scala.collection.JavaConverters._

/**
* Used to prepare (convert) and then commit a set of Delta actions into the Hudi table located
* at the same path as [[postCommitSnapshot]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,30 @@

package org.apache.spark.sql.delta.hudi

import java.io.{IOException, UncheckedIOException}
import java.util.concurrent.atomic.AtomicReference
import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.OptimisticTransactionImpl
import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.UniversalFormatConverter
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.hooks.HudiConverterHook
import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.hooks.HudiConverterHook
import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta._

import java.io.{IOException, UncheckedIOException}
import java.util.concurrent.atomic.AtomicReference
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

object HudiConverter {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package org.apache.spark.sql.delta.hudi

import org.apache.avro.{LogicalTypes, Schema}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.types._

import java.util

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.avro.{LogicalTypes, Schema}

import org.apache.spark.sql.types._

object HudiSchemaUtils extends DeltaLogging {

/////////////////
Expand Down Expand Up @@ -73,7 +75,8 @@ object HudiSchemaUtils extends DeltaLogging {
private def convertAtomic[E <: DataType](elem: E, isNullable: Boolean) = elem match {
case StringType => finalizeSchema(Schema.create(Schema.Type.STRING), isNullable)
case LongType => finalizeSchema(Schema.create(Schema.Type.LONG), isNullable)
case IntegerType | ShortType => finalizeSchema(Schema.create(Schema.Type.INT), isNullable)
case IntegerType => finalizeSchema(
Schema.create(Schema.Type.INT), isNullable)
case FloatType => finalizeSchema(Schema.create(Schema.Type.FLOAT), isNullable)
case DoubleType => finalizeSchema(Schema.create(Schema.Type.DOUBLE), isNullable)
case d: DecimalType => finalizeSchema(LogicalTypes.decimal(d.precision, d.scale)
Expand All @@ -84,8 +87,6 @@ object HudiSchemaUtils extends DeltaLogging {
LogicalTypes.date.addToSchema(Schema.create(Schema.Type.INT)), isNullable)
case TimestampType => finalizeSchema(
LogicalTypes.timestampMicros.addToSchema(Schema.create(Schema.Type.LONG)), isNullable)
case TimestampNTZType => finalizeSchema(
LogicalTypes.localTimestampMicros.addToSchema(Schema.create(Schema.Type.LONG)), isNullable)
case _ => throw new UnsupportedOperationException(s"Could not convert atomic type $elem")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@

package org.apache.spark.sql.delta.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.hadoop.fs.Path
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieTableType, HoodieTimelineTimeZone, HoodieDeltaWriteStat}
import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieDeltaWriteStat, HoodieTableType, HoodieTimelineTimeZone}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ExternalFilePathUtil
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.storage.StorageConfiguration
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.metering.DeltaLogging

object HudiTransactionUtils extends DeltaLogging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,21 @@

package org.apache.spark.sql.delta.hudi

import java.io.File
import java.time.Instant
import java.util.UUID
import java.util.stream.Collectors

import scala.collection.JavaConverters

import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaUnsupportedOperationException, OptimisticTransaction}
import org.apache.spark.sql.delta.DeltaOperations.Truncate
import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.fs.FSUtils
Expand All @@ -26,23 +39,12 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.storage.StorageConfiguration
import org.apache.hudi.storage.hadoop.{HadoopStorageConfiguration, HoodieHadoopStorage}

import org.apache.spark.SparkContext
import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.delta.DeltaOperations.Truncate
import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaUnsupportedOperationException, OptimisticTransaction}
import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, RemoveFile}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{ByteType, IntegerType, ShortType, StructField, StructType}
import org.apache.spark.util.{ManualClock, Utils}
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

import java.io.File
import java.time.Instant
import java.util.UUID
import java.util.stream.Collectors
import scala.collection.JavaConverters

class ConvertToHudiSuite extends QueryTest with Eventually {

Expand Down Expand Up @@ -150,6 +152,21 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
}
}

test("Enabling Delete Vector After Hudi Enabled Already Throws Exception") {
intercept[DeltaUnsupportedOperationException] {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 INT, col2 STRING) USING DELTA
|LOCATION '$testTablePath'
|TBLPROPERTIES (
| 'delta.universalFormat.enabledFormats' = 'hudi'
|)""".stripMargin)
_sparkSession.sql(
s"""ALTER TABLE `$testTableName` SET TBLPROPERTIES (
| 'delta.enableDeletionVectors' = true
|)""".stripMargin)
}
}

test(s"Conversion behavior for lists") {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 ARRAY<INT>) USING DELTA
Expand Down Expand Up @@ -201,6 +218,22 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
verifyFilesAndSchemaMatch()
}

test(s"Conversion behavior for nested structs") {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 STRUCT<field1: INT, field2: STRING,
|field3: STRUCT<field4: INT, field5: INT, field6: STRING>>)
|USING DELTA
|LOCATION '$testTablePath'
|TBLPROPERTIES (
| 'delta.universalFormat.enabledFormats' = 'hudi'
|)""".stripMargin)
_sparkSession.sql(
s"INSERT INTO `$testTableName` VALUES (named_struct('field1', 1, 'field2', 'hello', " +
"'field3', named_struct('field4', 2, 'field5', 3, 'field6', 'world')))"
)
verifyFilesAndSchemaMatch()
}

test("validate Hudi timeline archival and cleaning") {
val testOp = Truncate()
withDefaultTablePropsInSQLConf(true, {
Expand All @@ -213,7 +246,8 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
val file = AddFile(i.toString + ".parquet", Map.empty, 1, 1, true) :: Nil
val delete: Seq[Action] = if (i > 1) {
val timestamp = startTime + (System.currentTimeMillis() - actualTestStartTime)
RemoveFile((i - 1).toString + ".parquet", Some(timestamp), true) :: Nil
val prevFile = AddFile((i - 1).toString + ".parquet", Map.empty, 1, 1, true)
prevFile.removeWithTimestamp(timestamp) :: Nil
} else {
Nil
}
Expand Down Expand Up @@ -241,7 +275,9 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 BIGINT, col2 BOOLEAN, col3 DATE,
| col4 DOUBLE, col5 FLOAT, col6 INT, col7 STRING, col8 TIMESTAMP,
| col9 STRUCT<field1: INT, field2: STRING>)
| col9 BINARY, col10 DECIMAL(5, 2),
| col11 STRUCT<field1: INT, field2: STRING,
| field3: STRUCT<field4: INT, field5: INT, field6: STRING>>)
| USING DELTA
|LOCATION '$testTablePath'
|TBLPROPERTIES (
Expand All @@ -250,11 +286,25 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
val nowSeconds = Instant.now().getEpochSecond
_sparkSession.sql(s"INSERT INTO `$testTableName` VALUES (123, true, "
+ s"date(from_unixtime($nowSeconds)), 32.1, 1.23, 456, 'hello world', "
+ s"timestamp(from_unixtime($nowSeconds)), "
+ s"named_struct('field1', 789, 'field2', 'hello'))")
+ s"timestamp(from_unixtime($nowSeconds)), X'1ABF', -999.99,"
+ s"STRUCT(1, 'hello', STRUCT(2, 3, 'world')))")
verifyFilesAndSchemaMatch()
}

for (invalidType <- Seq("SMALLINT", "TINYINT", "TIMESTAMP_NTZ", "VOID")) {
test(s"Unsupported Type $invalidType Throws Exception") {
intercept[DeltaUnsupportedOperationException] {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 $invalidType) USING DELTA
|LOCATION '$testTablePath'
|TBLPROPERTIES (
| 'delta.universalFormat.enabledFormats' = 'hudi'
|)""".stripMargin)
}
}
}


test("all batches of actions are converted") {
withSQLConf(
DeltaSQLConf.HUDI_MAX_COMMITS_TO_CONVERT.key -> "3"
Expand Down Expand Up @@ -322,6 +372,7 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
s"Files do not match.\nExpected: $expectedFiles\nActual: $paths")
// Assert schemas are equal
val expectedSchema = deltaDF.schema

assert(hudiSchemaAsStruct.equals(expectedSchema),
s"Schemas do not match.\nExpected: $expectedSchema\nActual: $hudiSchemaAsStruct")
}
Expand Down Expand Up @@ -352,7 +403,6 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
.appName("UniformSession")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.types.NullType
import org.apache.spark.sql.types.{ByteType, CalendarIntervalType, NullType, ShortType, TimestampNTZType}

/**
* Utils to validate the Universal Format (UniForm) Delta feature (NOT a table feature).
Expand Down Expand Up @@ -103,7 +103,8 @@ object UniversalFormat extends DeltaLogging {
throw DeltaErrors.uniFormHudiDeleteVectorCompat()
}
SchemaUtils.findAnyTypeRecursively(newestMetadata.schema) { f =>
f.isInstanceOf[NullType]
f.isInstanceOf[NullType] | f.isInstanceOf[ByteType] | f.isInstanceOf[ShortType] |
f.isInstanceOf[TimestampNTZType]
} match {
case Some(unsupportedType) =>
throw DeltaErrors.uniFormHudiSchemaCompat(unsupportedType)
Expand Down
Loading