diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/appsettings/AppSettings.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/appsettings/AppSettings.scala index 8acc796a..cd61a86a 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/appsettings/AppSettings.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/appsettings/AppSettings.scala @@ -4,7 +4,7 @@ import org.apache.log4j.Level import org.apache.spark.SparkConf import ru.raiffeisen.checkita.config.IO.readAppConfig import ru.raiffeisen.checkita.config.Parsers._ -import ru.raiffeisen.checkita.config.appconf.{AppConfig, EmailConfig, MattermostConfig, StorageConfig} +import ru.raiffeisen.checkita.config.appconf.{AppConfig, EmailConfig, MattermostConfig, StorageConfig, StreamConfig} import ru.raiffeisen.checkita.utils.Common.{paramsSeqToMap, prepareConfig} import ru.raiffeisen.checkita.utils.ResultUtils._ import ru.raiffeisen.checkita.utils.EnrichedDT @@ -27,6 +27,7 @@ import scala.util.Try * @param storageConfig Configuration of connection to Data Quality Storage * @param emailConfig Configuration of connection to SMTP server * @param mattermostConfig Configuration of connection to Mattermost API + * @param streamConfig Streaming settings (used in streaming applications only) * @param sparkConf Spark configuration parameters * @param isLocal Boolean flag indicating whether spark application must be run locally. * @param isShared Boolean flag indicating whether spark application running within shared spark context. @@ -47,6 +48,7 @@ final case class AppSettings( storageConfig: Option[StorageConfig], emailConfig: Option[EmailConfig], mattermostConfig: Option[MattermostConfig], + streamConfig: StreamConfig, sparkConf: SparkConf, isLocal: Boolean, isShared: Boolean, @@ -124,6 +126,7 @@ object AppSettings { appConfig.storage, appConfig.email, appConfig.mattermost, + appConfig.streaming, conf, isLocal, isShared, @@ -190,6 +193,7 @@ object AppSettings { defaultAppConf.storage, defaultAppConf.email, defaultAppConf.mattermost, + defaultAppConf.streaming, new SparkConf(), isLocal = false, isShared = false, diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/Enums.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/Enums.scala index 7fceb40c..61cc6938 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/Enums.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/Enums.scala @@ -1,6 +1,7 @@ package ru.raiffeisen.checkita.config import enumeratum.{Enum, EnumEntry} +import ru.raiffeisen.checkita.config.Parsers.idParser import scala.collection.immutable @@ -27,6 +28,7 @@ object Enums { */ sealed trait KafkaTopicFormat extends EnumEntry object KafkaTopicFormat extends Enum[KafkaTopicFormat] { + case object String extends KafkaTopicFormat case object Xml extends KafkaTopicFormat case object Json extends KafkaTopicFormat case object Avro extends KafkaTopicFormat @@ -34,6 +36,31 @@ object Enums { override val values: immutable.IndexedSeq[KafkaTopicFormat] = findValues } + /** + * Supported Kafka windowing types, that control how windows are build: + * - based on processing time: current timestamp when rows is processed; + * - based on event time: kafka message creation timestamp; + * - custom time: arbitrary timestamp column from kafka message defined by user. + * @note this is not an enumeration since custom time type requires user-defined column name. + */ + sealed trait KafkaWindowing { val windowBy: String } + case object ProcessingTime extends KafkaWindowing { val windowBy: String = "processingTime" } + case object EventTime extends KafkaWindowing { val windowBy: String = "eventTime" } + case class CustomTime(column: String) extends KafkaWindowing { val windowBy: String = s"custom($column)" } + object KafkaWindowing { + private val customPattern = """^custom\((.+)\)$""".r + def apply(s: String): KafkaWindowing = s match { + case "processingTime" => ProcessingTime + case "eventTime" => EventTime + case custom if customPattern.pattern.matcher(custom).matches() => + val customPattern(columnName) = custom + val _ = idParser.parseTableIdentifier(columnName) // verify if columnName is a valid Spark SQL identifier + CustomTime(columnName) + case other => throw new IllegalArgumentException(s"Wrong Kafka windowing type: $other") + } + } + + /** * Supported file types for FileTypeLoadCheck */ diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/Implicits.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/Implicits.scala index dff288d9..a8383500 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/Implicits.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/Implicits.scala @@ -1,45 +1,28 @@ package ru.raiffeisen.checkita.config import org.apache.spark.sql.Column -import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.functions.expr -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.DataType import org.apache.spark.storage.StorageLevel import pureconfig.generic.{FieldCoproductHint, ProductHint} import pureconfig.{CamelCase, ConfigConvert, ConfigFieldMapping} import ru.raiffeisen.checkita.config.Enums._ +import ru.raiffeisen.checkita.config.Parsers.idParser import ru.raiffeisen.checkita.config.RefinedTypes.{DateFormat, ID} import ru.raiffeisen.checkita.config.jobconf.Outputs.FileOutputConfig import ru.raiffeisen.checkita.config.jobconf.Schemas.SchemaConfig import ru.raiffeisen.checkita.config.jobconf.Sources.{FileSourceConfig, VirtualSourceConfig} -import ru.raiffeisen.checkita.utils.SparkUtils.{toDataType, toStorageLvlString} +import ru.raiffeisen.checkita.utils.SparkUtils.{DurationOps, toDataType, toStorageLvlString} import java.time.ZoneId -import java.time.format.DateTimeFormatter import java.util.TimeZone -import scala.util.{Failure, Success, Try} +import scala.concurrent.duration.Duration /** * Implicit pureconfig hints and converters for specific types used in Job Configuration. */ object Implicits { - /** SparkSqlParser used to validate IDs - * @note SparkSqlParser has evolved in Spark 3.1 to use active SQLConf. - * Thus, its constructor became parameterless. - * Therefore, in order to instantiate SparkSqlParser it is required - * to get constructor specific to current Spark version. - */ - private val idParser = { - val parserCls = classOf[SparkSqlParser] - (Try(parserCls.getConstructor()), Try(parserCls.getConstructor(classOf[SQLConf]))) match { - case (Success(constructor), Failure(_)) => constructor.newInstance() - case (Failure(_), Success(constructor)) => constructor.newInstance(new SQLConf) - case _ => throw new NoSuchMethodException("Unable to construct Spark SQL Parser") - } - } - implicit def hint[A]: ProductHint[A] = ProductHint[A]( ConfigFieldMapping(CamelCase, CamelCase), allowUnknownKeys = false @@ -55,10 +38,7 @@ object Implicits { ) implicit val dateFormatConverter: ConfigConvert[DateFormat] = - ConfigConvert[String].xmap[DateFormat]( - pattern => DateFormat(pattern, DateTimeFormatter.ofPattern(pattern)), - dateFormat => dateFormat.pattern - ) + ConfigConvert[String].xmap[DateFormat](DateFormat.fromString, _.pattern) implicit val timeZoneConverter: ConfigConvert[ZoneId] = ConfigConvert[String].xmap[ZoneId]( @@ -69,12 +49,18 @@ object Implicits { implicit val dqStorageTypeConverter: ConfigConvert[DQStorageType] = ConfigConvert[String].xmap[DQStorageType](DQStorageType.withNameInsensitive, _.toString.toLowerCase) - implicit val dqDataTypeReader: ConfigConvert[DataType] = + implicit val dqDataTypeConverter: ConfigConvert[DataType] = ConfigConvert[String].xmap[DataType]( toDataType, t => t.toString.toLowerCase.replace("type", "") ) + implicit val durationTypeConverter: ConfigConvert[Duration] = + ConfigConvert[String].xmap[Duration](Duration(_), _.toShortString) + + implicit val kafkaWindowingConverter: ConfigConvert[KafkaWindowing] = + ConfigConvert[String].xmap[KafkaWindowing](KafkaWindowing(_), _.windowBy) + private val dropRight: String => String => String = dropText => s => s.dropRight(dropText.length).zipWithIndex.map(t => if (t._2 == 0) t._1.toLower else t._1).mkString diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/Parsers.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/Parsers.scala index 1aa746d7..a135233d 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/Parsers.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/Parsers.scala @@ -1,12 +1,31 @@ package ru.raiffeisen.checkita.config import com.typesafe.config.{Config, ConfigFactory} +import org.apache.spark.sql.execution.SparkSqlParser +import org.apache.spark.sql.internal.SQLConf import pureconfig.error.{ConfigReaderFailure, ConvertFailure} import java.io.{File, FileNotFoundException, InputStreamReader} +import scala.util.{Failure, Success, Try} object Parsers { + /** SparkSqlParser used to validate IDs + * + * @note SparkSqlParser has evolved in Spark 3.1 to use active SQLConf. + * Thus, its constructor became parameterless. + * Therefore, in order to instantiate SparkSqlParser it is required + * to get constructor specific to current Spark version. + */ + val idParser: SparkSqlParser = { + val parserCls = classOf[SparkSqlParser] + (Try(parserCls.getConstructor()), Try(parserCls.getConstructor(classOf[SQLConf]))) match { + case (Success(constructor), Failure(_)) => constructor.newInstance() + case (Failure(_), Success(constructor)) => constructor.newInstance(new SQLConf) + case _ => throw new NoSuchMethodException("Unable to construct Spark SQL Parser") + } + } + /** * Type class for Config parsers * @tparam T Type of the input from which the config should be parsed. diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/RefinedTypes.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/RefinedTypes.scala index 86cfb192..78b979e5 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/RefinedTypes.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/RefinedTypes.scala @@ -57,4 +57,7 @@ object RefinedTypes { * @param formatter - datetime formatter for pattern */ case class DateFormat(pattern: String, @transient formatter: DateTimeFormatter) + object DateFormat { + def fromString(s: String): DateFormat = DateFormat(s, DateTimeFormatter.ofPattern(s)) + } } diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/appconf/AppConfig.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/appconf/AppConfig.scala index cc307928..acee27e2 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/appconf/AppConfig.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/appconf/AppConfig.scala @@ -18,6 +18,7 @@ final case class AppConfig( storage: Option[StorageConfig], email: Option[EmailConfig], mattermost: Option[MattermostConfig], + streaming: StreamConfig = StreamConfig(), dateTimeOptions: DateTimeConfig = DateTimeConfig(), enablers: Enablers = Enablers(), defaultSparkOptions: Seq[SparkParam] = Seq.empty, diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/appconf/DateTimeConfig.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/appconf/DateTimeConfig.scala index f261aa15..91fdbc70 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/appconf/DateTimeConfig.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/appconf/DateTimeConfig.scala @@ -13,12 +13,6 @@ import ru.raiffeisen.checkita.config.RefinedTypes.DateFormat */ case class DateTimeConfig( timeZone: ZoneId = ZoneId.of("UTC"), - referenceDateFormat: DateFormat = DateFormat( - pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS", - formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS") - ), - executionDateFormat: DateFormat = DateFormat( - pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS", - formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS") - ) + referenceDateFormat: DateFormat = DateFormat.fromString("yyyy-MM-dd'T'HH:mm:ss.SSS"), + executionDateFormat: DateFormat = DateFormat.fromString("yyyy-MM-dd'T'HH:mm:ss.SSS") ) diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/appconf/StreamConfig.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/appconf/StreamConfig.scala new file mode 100644 index 00000000..7333a064 --- /dev/null +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/appconf/StreamConfig.scala @@ -0,0 +1,20 @@ +package ru.raiffeisen.checkita.config.appconf + +import scala.concurrent.duration.Duration +import java.util.UUID + +/** + * Application-level configuration describing streaming settings + * @param trigger Trigger interval: defines time interval for which micro-batches are collected. + * @param window Window interval: defines tabbing window size used to accumulate metrics. + * @param watermark Watermark level: defines time interval after which late records are no longer processed. + */ +case class StreamConfig( + trigger: Duration = Duration("10s"), + window: Duration = Duration("10m"), + watermark: Duration = Duration("5m") + ) { + // random column names are generated to be used for windowing: + lazy val windowTsCol: String = UUID.randomUUID.toString.replace("-", "") + lazy val eventTsCol: String = UUID.randomUUID.toString.replace("-", "") +} diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/Files.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/Files.scala index 43d5e587..ae6e9a49 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/Files.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/Files.scala @@ -1,7 +1,7 @@ package ru.raiffeisen.checkita.config.jobconf import eu.timepit.refined.types.string.NonEmptyString -import ru.raiffeisen.checkita.config.RefinedTypes.URI +import ru.raiffeisen.checkita.config.RefinedTypes.{ID, URI} /** * @note General note on working with files in Checkita Framework: @@ -19,9 +19,11 @@ object Files { /** * Base trait for all types of configurations that refer to a file. * All such configurations must contain a path to a file. + * In addition, optional schemaId can be provided to point to a schema used for data reading. */ trait FileConfig { val path: URI + val schema: Option[ID] } /** @@ -37,9 +39,13 @@ object Files { /** * Base trait for Avro files. Configuration for avro files may contain reference to avro schema ID. */ - trait AvroFileConfig extends FileConfig { - val schema: Option[NonEmptyString] - } + trait AvroFileConfig extends FileConfig + + /** + * Base trait for fixed-width text files. + * For such files configuration must contain reference explicit fixed (full or short) schema ID + */ + trait FixedFileConfig extends FileConfig /** * Base trait for delimited text files such as CSV or TSV. @@ -55,14 +61,6 @@ object Files { val quote: NonEmptyString val escape: NonEmptyString val header: Boolean - val schema: Option[NonEmptyString] } - /** - * Base trait for fixed-width text files. - * For such files configuration must contain reference explicit fixed (full or short) schema ID - */ - trait FixedFileConfig extends FileConfig { - val schema: NonEmptyString - } } diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/JobConfig.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/JobConfig.scala index 937f9d5d..f2fba983 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/JobConfig.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/JobConfig.scala @@ -1,12 +1,12 @@ package ru.raiffeisen.checkita.config.jobconf -import ru.raiffeisen.checkita.config.RefinedTypes.{ID, SparkParam} +import ru.raiffeisen.checkita.config.RefinedTypes.ID import ru.raiffeisen.checkita.config.jobconf.Checks.ChecksConfig import ru.raiffeisen.checkita.config.jobconf.Connections.ConnectionsConfig import ru.raiffeisen.checkita.config.jobconf.LoadChecks.LoadChecksConfig import ru.raiffeisen.checkita.config.jobconf.Metrics.MetricsConfig import ru.raiffeisen.checkita.config.jobconf.Schemas.SchemaConfig -import ru.raiffeisen.checkita.config.jobconf.Sources.{SourcesConfig, VirtualSourceConfig} +import ru.raiffeisen.checkita.config.jobconf.Sources.{SourcesConfig, StreamSourcesConfig, VirtualSourceConfig} import ru.raiffeisen.checkita.config.jobconf.Targets.TargetsConfig /** @@ -14,7 +14,8 @@ import ru.raiffeisen.checkita.config.jobconf.Targets.TargetsConfig * @param jobId Job ID * @param connections Connections to external data systems (RDBMS, Message Brokers, etc.) * @param schemas Various schema definitions - * @param sources Data sources processed within current job. + * @param sources Data sources processed within current job (only applicable to batch jobs). + * @param streams Stream sources processed within current job (only applicable to streaming jobs). * @param virtualSources Virtual sources to be created from basic sources * @param loadChecks Load checks to be performed on data sources before reading data itself * @param metrics Metrics to be calculated for data sources @@ -26,6 +27,7 @@ final case class JobConfig( connections: Option[ConnectionsConfig], schemas: Seq[SchemaConfig] = Seq.empty, sources: Option[SourcesConfig], + streams: Option[StreamSourcesConfig], virtualSources: Seq[VirtualSourceConfig] = Seq.empty, loadChecks: Option[LoadChecksConfig], metrics: Option[MetricsConfig], diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/Outputs.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/Outputs.scala index f580ae11..c5db1bae 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/Outputs.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/Outputs.scala @@ -5,7 +5,7 @@ import eu.timepit.refined.api.Refined import eu.timepit.refined.collection.NonEmpty import eu.timepit.refined.types.string.NonEmptyString import ru.raiffeisen.checkita.config.Enums.TemplateFormat -import ru.raiffeisen.checkita.config.RefinedTypes.{Email, MMRecipient, SparkParam, URI} +import ru.raiffeisen.checkita.config.RefinedTypes.{Email, ID, MMRecipient, SparkParam, URI} import ru.raiffeisen.checkita.config.jobconf.Files._ @@ -45,7 +45,7 @@ object Outputs { header: Boolean = false ) extends FileOutputConfig with DelimitedFileConfig { // Schema is not required as input parameter as it is enforced on write. - val schema: Option[NonEmptyString] = None + val schema: Option[ID] = None } /** @@ -56,7 +56,7 @@ object Outputs { final case class AvroFileOutputConfig( path: URI ) extends FileOutputConfig with AvroFileConfig { - val schema: Option[NonEmptyString] = None + val schema: Option[ID] = None } /** @@ -66,7 +66,9 @@ object Outputs { */ final case class OrcFileOutputConfig( path: URI - ) extends FileOutputConfig with OrcFileConfig + ) extends FileOutputConfig with OrcFileConfig { + val schema: Option[ID] = None + } /** * Parquet file output configuration @@ -75,7 +77,9 @@ object Outputs { */ final case class ParquetFileOutputConfig( path: URI - ) extends FileOutputConfig with ParquetFileConfig + ) extends FileOutputConfig with ParquetFileConfig { + val schema: Option[ID] = None + } /** diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/Sources.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/Sources.scala index 93a4d295..d1ea72f6 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/Sources.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/jobconf/Sources.scala @@ -6,7 +6,7 @@ import eu.timepit.refined.collection.NonEmpty import eu.timepit.refined.types.string.NonEmptyString import org.apache.spark.sql.Column import org.apache.spark.storage.StorageLevel -import ru.raiffeisen.checkita.config.Enums.{KafkaTopicFormat, SparkJoinType} +import ru.raiffeisen.checkita.config.Enums.{KafkaTopicFormat, KafkaWindowing, ProcessingTime, SparkJoinType} import ru.raiffeisen.checkita.config.RefinedTypes._ import ru.raiffeisen.checkita.config.jobconf.Files._ import ru.raiffeisen.checkita.config.jobconf.Outputs._ @@ -21,6 +21,7 @@ object Sources { sealed abstract class SourceConfig { val id: ID val keyFields: Seq[NonEmptyString] + val streamable: Boolean } /** @@ -33,11 +34,13 @@ object Sources { */ final case class TableSourceConfig( id: ID, - connection: NonEmptyString, + connection: ID, table: Option[NonEmptyString], query: Option[NonEmptyString], keyFields: Seq[NonEmptyString] = Seq.empty - ) extends SourceConfig + ) extends SourceConfig { + val streamable: Boolean = false + } /** * Configuration for Hive Table partition values to read. @@ -67,7 +70,9 @@ object Sources { table: NonEmptyString, partitions: Seq[HivePartition] = Seq.empty, keyFields: Seq[NonEmptyString] = Seq.empty - ) extends SourceConfig + ) extends SourceConfig { + val streamable: Boolean = false + } /** * Kafka source configuration @@ -76,23 +81,44 @@ object Sources { * @param connection Connection ID (must be a Kafka Connection) * @param topics Sequence of topics to read * @param topicPattern Pattern that defined topics to read - * @param format Topic message format - * @param startingOffsets Json-string defining starting offsets (default: "earliest") - * @param endingOffsets Json-string defining ending offsets (default: "latest") + * @param startingOffsets Json-string defining starting offsets. + * If none is set, then "earliest" is used in batch jobs and "latest is used in streaming jobs. + * @param endingOffsets Json-string defining ending offset. Applicable only to batch jobs. + * If none is set then "latest" is used. + * @param windowBy Source of timestamp used to build windows. Applicable only for streaming jobs! + * Default: processingTime - uses current timestamp at the moment when Spark processes row. + * Other options are: + * - eventTime - uses Kafka message creation timestamp. + * - customTime(columnName) - uses arbitrary user-defined column from kafka message + * (column must be of TimestampType) + * @param keyFormat Message key format. Default: string. + * @param valueFormat Message value format. Default: string. + * @param keySchema Schema ID. Used to parse message key. Ignored when keyFormat is string. + * Mandatory for other formats. + * @param valueSchema Schema ID. Used to parse message value. Ignored when valueFormat is string. + * Mandatory for other formats. + * Used to parse kafka message value. * @param options Sequence of additional Kafka options * @param keyFields Sequence of key fields (columns that identify data row) */ final case class KafkaSourceConfig( id: ID, - connection: NonEmptyString, + connection: ID, topics: Seq[NonEmptyString] = Seq.empty, topicPattern: Option[NonEmptyString], format: KafkaTopicFormat, - startingOffsets: NonEmptyString = "earliest", - endingOffsets: NonEmptyString = "latest", + startingOffsets: Option[NonEmptyString], // earliest for batch, latest for stream + endingOffsets: Option[NonEmptyString], // latest for batch, ignored for stream. + windowBy: KafkaWindowing = ProcessingTime, + keyFormat: KafkaTopicFormat = KafkaTopicFormat.String, + valueFormat: KafkaTopicFormat = KafkaTopicFormat.String, + keySchema: Option[ID] = None, + valueSchema: Option[ID] = None, options: Seq[SparkParam] = Seq.empty, keyFields: Seq[NonEmptyString] = Seq.empty - ) extends SourceConfig + ) extends SourceConfig { + val streamable: Boolean = true + } /** * Base class for file source configurations. @@ -110,9 +136,11 @@ object Sources { final case class FixedFileSourceConfig( id: ID, path: URI, - schema: NonEmptyString, + schema: Option[ID], keyFields: Seq[NonEmptyString] = Seq.empty - ) extends FileSourceConfig with FixedFileConfig + ) extends FileSourceConfig with FixedFileConfig { + val streamable: Boolean = true + } /** * Delimited file source configuration @@ -129,54 +157,66 @@ object Sources { final case class DelimitedFileSourceConfig( id: ID, path: URI, - schema: Option[NonEmptyString], + schema: Option[ID], delimiter: NonEmptyString = ",", quote: NonEmptyString = "\"", escape: NonEmptyString = "\\", header: Boolean = false, keyFields: Seq[NonEmptyString] = Seq.empty - ) extends FileSourceConfig with DelimitedFileConfig + ) extends FileSourceConfig with DelimitedFileConfig { + val streamable: Boolean = true + } /** * Avro file source configuration * * @param id Source ID * @param path Path to file - * @param schema Schema ID (must be and Avro Schema) + * @param schema Schema ID * @param keyFields Sequence of key fields (columns that identify data row) */ final case class AvroFileSourceConfig( id: ID, path: URI, - schema: Option[NonEmptyString], + schema: Option[ID], keyFields: Seq[NonEmptyString] = Seq.empty - ) extends FileSourceConfig with AvroFileConfig + ) extends FileSourceConfig with AvroFileConfig { + val streamable: Boolean = true + } /** * Orc file source configuration * * @param id Source ID * @param path Path to file + * @param schema Schema ID * @param keyFields Sequence of key fields (columns that identify data row) */ final case class OrcFileSourceConfig( id: ID, path: URI, + schema: Option[ID], keyFields: Seq[NonEmptyString] = Seq.empty - ) extends FileSourceConfig with OrcFileConfig + ) extends FileSourceConfig with OrcFileConfig { + val streamable: Boolean = true + } /** * Parquet file source configuration * * @param id Source ID * @param path Path to file + * @param schema Schema ID * @param keyFields Sequence of key fields (columns that identify data row) */ final case class ParquetFileSourceConfig( id: ID, path: URI, + schema: Option[ID], keyFields: Seq[NonEmptyString] = Seq.empty - ) extends FileSourceConfig with ParquetFileConfig + ) extends FileSourceConfig with ParquetFileConfig { + val streamable: Boolean = true + } /** * Custom source configuration: @@ -195,7 +235,9 @@ object Sources { schema: Option[ID], options: Seq[SparkParam] = Seq.empty, keyFields: Seq[NonEmptyString] = Seq.empty - ) extends SourceConfig + ) extends SourceConfig { + val streamable: Boolean = false + } /** * Base class for all virtual source configurations. @@ -210,6 +252,7 @@ object Sources { val parents: Seq[String] // additional validation will be imposed on the required number // of parent sources depending on virtual source type. + val streamable: Boolean = true } /** @@ -344,5 +387,16 @@ object Sources { def getAllSources: Seq[SourceConfig] = this.productIterator.toSeq.flatMap(_.asInstanceOf[Seq[Any]]).map(_.asInstanceOf[SourceConfig]) } - + + /** + * Data Quality job configuration section describing streams + * + * @param kafka Sequence of streams based on Kafka topics + */ + final case class StreamSourcesConfig( + kafka: Seq[KafkaSourceConfig] = Seq.empty, + ) { + def getAllSources: Seq[SourceConfig] = + this.productIterator.toSeq.flatMap(_.asInstanceOf[Seq[Any]]).map(_.asInstanceOf[SourceConfig]) + } } diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/validation/PreValidation.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/validation/PreValidation.scala index 033f7258..96a8321e 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/validation/PreValidation.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/config/validation/PreValidation.scala @@ -3,11 +3,10 @@ package ru.raiffeisen.checkita.config.validation import pureconfig.generic.semiauto.{deriveReader, deriveWriter} import pureconfig.{ConfigReader, ConfigWriter} import ru.raiffeisen.checkita.config.Enums.TrendCheckRule +import ru.raiffeisen.checkita.config.appconf.StreamConfig import ru.raiffeisen.checkita.config.jobconf.Checks._ import ru.raiffeisen.checkita.config.jobconf.MetricParams.LevenshteinDistanceParams -import ru.raiffeisen.checkita.config.jobconf.Sources.{ - DelimitedFileSourceConfig, KafkaSourceConfig, TableSourceConfig -} +import ru.raiffeisen.checkita.config.jobconf.Sources.{DelimitedFileSourceConfig, FixedFileSourceConfig, KafkaSourceConfig, TableSourceConfig} import javax.validation.ValidationException import scala.concurrent.duration.Duration @@ -23,6 +22,47 @@ import ru.raiffeisen.checkita.config.Implicits._ */ object PreValidation { + /** + * Implicit StreamConfig reader validation + */ + implicit val validateStreamConfigReader: ConfigReader[StreamConfig] = + deriveReader[StreamConfig].ensure( + s => s.trigger < s.window, + _ => "For proper streaming processing, micro-batch trigger interval must be less than window interval." + ) + + /** + * Implicit StreamConfig writer validation + */ + implicit val validateStreamConfigWriter: ConfigWriter[StreamConfig] = + deriveWriter[StreamConfig].contramap[StreamConfig] { x => + if (x.trigger < x.window) x + else throw new ValidationException( + s"Error during writing ${x.toString}: " + + "for proper streaming processing, micro-batch trigger interval must be less than window interval." + ) + } + + /** + * Implicit FixedFileSource reader validation + */ + implicit val validateFixedFileSourceReader: ConfigReader[FixedFileSourceConfig] = + deriveReader[FixedFileSourceConfig].ensure( + _.schema.nonEmpty, + _ => "For fixed-width files schema must always be specified." + ) + + /** + * Implicit FixedFileSource writer validation + */ + implicit val validateFixedFileSourceWriter: ConfigWriter[FixedFileSourceConfig] = + deriveWriter[FixedFileSourceConfig].contramap[FixedFileSourceConfig] { x => + if (x.schema.nonEmpty) x + else throw new ValidationException( + s"Error during writing ${x.toString}: for fixed-width files schema must always be specified." + ) + } + /** * Ensure that schema for delimited files is either read from header or from explicit schema. * @param f Parsed delimited file source configuration diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/DQConnection.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/DQConnection.scala index 8282f988..b17dddad 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/DQConnection.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/DQConnection.scala @@ -3,6 +3,7 @@ package ru.raiffeisen.checkita.connections import org.apache.spark.sql.{DataFrame, SparkSession} import ru.raiffeisen.checkita.appsettings.AppSettings import ru.raiffeisen.checkita.config.jobconf.Sources.SourceConfig +import ru.raiffeisen.checkita.readers.SchemaReaders.SourceSchema import ru.raiffeisen.checkita.utils.ResultUtils.Result /** @@ -24,10 +25,15 @@ abstract class DQConnection { /** * Loads external data into dataframe given a source configuration + * * @param sourceConfig Source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Implicit Map of all explicitly defined schemas (schemaId -> SourceSchema) * @return Spark DataFrame */ - def loadDataframe(sourceConfig: SourceType)(implicit settings: AppSettings, spark: SparkSession): DataFrame + def loadDataFrame(sourceConfig: SourceType) + (implicit settings: AppSettings, + spark: SparkSession, + schemas: Map[String, SourceSchema]): DataFrame } diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/DQStreamingConnection.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/DQStreamingConnection.scala new file mode 100644 index 00000000..f166e981 --- /dev/null +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/DQStreamingConnection.scala @@ -0,0 +1,25 @@ +package ru.raiffeisen.checkita.connections + +import org.apache.spark.sql.{DataFrame, SparkSession} +import ru.raiffeisen.checkita.appsettings.AppSettings +import ru.raiffeisen.checkita.readers.SchemaReaders.SourceSchema + +/** + * Trait to be mix in connections that can read streams. + */ +trait DQStreamingConnection { this: DQConnection => + + /** + * Loads stream into a dataframe given the stream configuration + * + * @param sourceConfig Stream configuration + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Implicit Map of all explicitly defined schemas (schemaId -> SourceSchema) + * @return Spark Streaming DataFrame + */ + def loadDataStream(sourceConfig: SourceType) + (implicit settings: AppSettings, + spark: SparkSession, + schemas: Map[String, SourceSchema]): DataFrame +} diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/jdbc/JdbcConnection.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/jdbc/JdbcConnection.scala index 436466d7..211466a4 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/jdbc/JdbcConnection.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/jdbc/JdbcConnection.scala @@ -5,6 +5,7 @@ import ru.raiffeisen.checkita.appsettings.AppSettings import ru.raiffeisen.checkita.config.jobconf.Connections.JdbcConnectionConfig import ru.raiffeisen.checkita.config.jobconf.Sources.TableSourceConfig import ru.raiffeisen.checkita.connections.DQConnection +import ru.raiffeisen.checkita.readers.SchemaReaders.SourceSchema import ru.raiffeisen.checkita.utils.Common.paramsSeqToMap import ru.raiffeisen.checkita.utils.ResultUtils._ @@ -51,14 +52,17 @@ abstract class JdbcConnection[T <: JdbcConnectionConfig] extends DQConnection { /** * Loads external data into dataframe given a source configuration + * * @param sourceConfig Source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Implicit Map of all explicitly defined schemas (schemaId -> SourceSchema) * @return Spark DataFrame */ - def loadDataframe(sourceConfig: SourceType) + def loadDataFrame(sourceConfig: SourceType) (implicit settings: AppSettings, - spark: SparkSession): DataFrame = { + spark: SparkSession, + schemas: Map[String, SourceSchema]): DataFrame = { val props = getProperties paramsSeqToMap(sparkParams).foreach{ case (k, v) => props.put(k, v) } diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/kafka/KafkaConnection.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/kafka/KafkaConnection.scala index 9c648885..d9cb6df2 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/kafka/KafkaConnection.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/connections/kafka/KafkaConnection.scala @@ -4,22 +4,24 @@ import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.apache.spark.sql.expressions.UserDefinedFunction -import org.apache.spark.sql.functions.{col, from_json, udf} -import org.apache.spark.sql.types.StringType -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{LongType, StringType, TimestampType} +import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.json.XML import ru.raiffeisen.checkita.appsettings.AppSettings -import ru.raiffeisen.checkita.config.Enums.KafkaTopicFormat +import ru.raiffeisen.checkita.config.Enums.{CustomTime, EventTime, KafkaTopicFormat, ProcessingTime} import ru.raiffeisen.checkita.config.jobconf.Connections.KafkaConnectionConfig import ru.raiffeisen.checkita.config.jobconf.Sources.KafkaSourceConfig -import ru.raiffeisen.checkita.connections.DQConnection +import ru.raiffeisen.checkita.connections.{DQConnection, DQStreamingConnection} +import ru.raiffeisen.checkita.readers.SchemaReaders.SourceSchema import ru.raiffeisen.checkita.utils.Common.paramsSeqToMap import ru.raiffeisen.checkita.utils.ResultUtils._ +import ru.raiffeisen.checkita.utils.SparkUtils.DurationOps import java.util.Properties import scala.util.Try -case class KafkaConnection(config: KafkaConnectionConfig) extends DQConnection { +case class KafkaConnection(config: KafkaConnectionConfig) extends DQConnection with DQStreamingConnection { type SourceType = KafkaSourceConfig val id: String = config.id.value protected val sparkParams: Seq[String] = config.parameters.map(_.value) @@ -31,11 +33,12 @@ case class KafkaConnection(config: KafkaConnectionConfig) extends DQConnection { /** * Gets proper subscribe option for given source configuration. - * @param conf Kafka source configuration + * @param topics List of Kafka topics (can be empty) + * @param topicPattern Topic pattern (optional) * @return Topic(s) subscribe option */ - private def getSubscribeOption(conf: SourceType): (String, String) = - (conf.topics.map(_.value), conf.topicPattern.map(_.value)) match { + private def getSubscribeOption(topics: Seq[String], topicPattern: Option[String]): (String, String) = + (topics, topicPattern) match { case (ts@_ :: _, None) => if (ts.forall(_.contains("@"))) { "assign" -> ts.map(_.split("@")).collect { case Array(topic, partitions) => "\"" + topic + "\":" + partitions @@ -52,7 +55,41 @@ case class KafkaConnection(config: KafkaConnectionConfig) extends DQConnection { "topics must be defined either explicitly as a sequence of topic names or as a topic pattern." ) } - + + /** + * Decodes kafka message key and value columns given their format. + * @param colName Name of the column to decode (either 'key' or 'value') + * @param format Column format to parse + * @param schemaId Schema ID used to parse column of JSON or XML format + * @param schemas Map of all explicitly defined schemas (schemaId -> SourceSchema) + * @return Decoded column + */ + private def decodeColumn(colName: String, format: KafkaTopicFormat, schemaId: Option[String]) + (implicit schemas: Map[String, SourceSchema]): Column = { + + val schemaGetter = (formatStr: String) => { + val keySchemaId = schemaId.getOrElse(throw new IllegalArgumentException( + s"Schema must be provided in order to parse kafka message '$colName' column of $formatStr format" + )) + schemas.getOrElse(keySchemaId, + throw new NoSuchElementException(s"Schema with id = '$keySchemaId' not found.") + ) + } + + format match { + case KafkaTopicFormat.String => col(colName).cast(StringType).as(colName) + case KafkaTopicFormat.Json => + val keySchema = schemaGetter("JSON") + from_json(col(colName).cast(StringType), keySchema.schema).alias(colName) + case KafkaTopicFormat.Xml => + val keySchema = schemaGetter("XML") + from_json(xmlToJson(col(colName).cast(StringType)), keySchema.schema).alias(colName) + case other => throw new IllegalArgumentException( + s"Wrong kafka topic message format for column '$colName': ${other.toString}" + ) // we do not support avro for now. + } + } + /** * Checks connection. * @@ -68,36 +105,73 @@ case class KafkaConnection(config: KafkaConnectionConfig) extends DQConnection { /** * Loads external data into dataframe given a source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object + * * @param sourceConfig Source configuration + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Implicit Map of all explicitly defined schemas (schemaId -> SourceSchema) * @return Spark DataFrame */ - def loadDataframe(sourceConfig: SourceType) + def loadDataFrame(sourceConfig: SourceType) (implicit settings: AppSettings, - spark: SparkSession): DataFrame = { - import spark.implicits._ + spark: SparkSession, + schemas: Map[String, SourceSchema]): DataFrame = { val allOptions = kafkaParams ++ paramsSeqToMap(sourceConfig.options.map(_.value)) + - getSubscribeOption(sourceConfig) + - ("startingOffsets" -> sourceConfig.startingOffsets.value) + - ("endingOffsets" -> sourceConfig.endingOffsets.value) + getSubscribeOption(sourceConfig.topics.map(_.value), sourceConfig.topicPattern.map(_.value)) + + ("startingOffsets" -> sourceConfig.startingOffsets.map(_.value).getOrElse("earliest")) + + ("endingOffsets" -> sourceConfig.endingOffsets.map(_.value).getOrElse("latest")) val rawDF = spark.read.format("kafka").options(allOptions).load() - val decodedDF = sourceConfig.format match { - case KafkaTopicFormat.Json => - rawDF.select(col("value").cast(StringType).alias("data")) - case KafkaTopicFormat.Xml => - rawDF.select(xmlToJson(col("value").cast(StringType)).alias("data")) - case fmt => throw new IllegalArgumentException( - s"Wrong kafka topic format for kafka source $id: ${fmt.toString}" - ) // we do not support avro for now. + + val keyColumn = decodeColumn("key", sourceConfig.keyFormat, sourceConfig.keySchema.map(_.value)) + val valueColumn = decodeColumn("value", sourceConfig.valueFormat, sourceConfig.valueSchema.map(_.value)) + + rawDF.select(keyColumn, valueColumn) + } + + /** + * Loads stream into a dataframe given the stream configuration + * + * @param sourceConfig Source configuration + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Implicit Map of all explicitly defined schemas (schemaId -> SourceSchema) + * @return Spark Streaming DataFrame + */ + def loadDataStream(sourceConfig: SourceType) + (implicit settings: AppSettings, + spark: SparkSession, + schemas: Map[String, SourceSchema]): DataFrame = { + val allOptions = kafkaParams ++ + paramsSeqToMap(sourceConfig.options.map(_.value)) + + getSubscribeOption(sourceConfig.topics.map(_.value), sourceConfig.topicPattern.map(_.value)) + + ("startingOffsets" -> sourceConfig.startingOffsets.map(_.value).getOrElse("latest")) + + val windowTsCol = settings.streamConfig.windowTsCol + val eventTsCol = settings.streamConfig.eventTsCol + val windowDuration = settings.streamConfig.window.toSparkInterval + + val rawStream = spark.readStream.format("kafka").options(allOptions).load() + + val keyColumn = decodeColumn("key", sourceConfig.keyFormat, sourceConfig.keySchema.map(_.value)) + val valueColumn = decodeColumn("value", sourceConfig.valueFormat, sourceConfig.valueSchema.map(_.value)) + val eventColumn = sourceConfig.windowBy match { + case ProcessingTime => current_timestamp().cast(LongType) + case EventTime => col("timestamp") + case CustomTime(colName) => col(colName).cast(LongType) } - val schema = spark.read.json(decodedDF.select("data").as[String]).schema - - decodedDF.withColumn("jsonData", from_json(col("data"), schema)).select("jsonData.*") + rawStream.select(keyColumn, valueColumn, col("timestamp")) + .withColumn(eventTsCol, eventColumn) + .withColumn(s"${windowTsCol}_pre", window(col(eventTsCol).cast(TimestampType), windowDuration)) + .select( + col(eventTsCol), + col(s"${windowTsCol}_pre").getField("start").cast(LongType).as(windowTsCol), + col("key"), + col("value") + ) } /** diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/context/DQContext.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/context/DQContext.scala index a9a2ffad..b94d05f5 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/context/DQContext.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/context/DQContext.scala @@ -44,7 +44,7 @@ import scala.util.Try */ class DQContext(settings: AppSettings, spark: SparkSession, fs: FileSystem) extends Logging { - implicit private val sparkSess: SparkSession = spark + implicit private val sparkSes: SparkSession = spark implicit private val fileSystem: FileSystem = fs implicit private val appSettings: AppSettings = settings diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/core/Source.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/core/Source.scala index ecc9f5a2..db537ab3 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/core/Source.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/core/Source.scala @@ -4,7 +4,8 @@ import org.apache.spark.sql.DataFrame /** * Data Quality Source definition - * + * @note Source can hold both static and streaming dataframes and, therefore, both batch source readers + * and stream source readers return same source definition. * @param id Source ID * @param df Spark dataframe with source data * @param keyFields Key field (columns) of this source: uniquely define data row @@ -15,4 +16,6 @@ case class Source( df: DataFrame, keyFields: Seq[String] = Seq.empty, parents: Seq[String] = Seq.empty - ) + ) { + val isStreaming: Boolean = df.isStreaming +} diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/readers/SourceReaders.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/readers/SourceReaders.scala index 9fcb6c1a..3642f52c 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/readers/SourceReaders.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/readers/SourceReaders.scala @@ -1,6 +1,8 @@ package ru.raiffeisen.checkita.readers +import enumeratum.{Enum, EnumEntry} import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -13,13 +15,24 @@ import ru.raiffeisen.checkita.core.Source import ru.raiffeisen.checkita.readers.SchemaReaders.SourceSchema import ru.raiffeisen.checkita.utils.Common.paramsSeqToMap import ru.raiffeisen.checkita.utils.ResultUtils._ +import ru.raiffeisen.checkita.utils.SparkUtils.getRowEncoder import java.io.FileNotFoundException import scala.annotation.tailrec +import scala.collection.immutable import scala.util.Try object SourceReaders { + /** + * Sources can be read in two modes: as a static dataframe and as a stream one. + */ + sealed private trait ReadMode extends EnumEntry + private object ReadMode extends Enum[ReadMode] { + case object Batch extends ReadMode + case object Stream extends ReadMode + override val values: immutable.IndexedSeq[ReadMode] = findValues + } /** * Base source reader trait @@ -36,21 +49,28 @@ object SourceReaders { */ protected def toSource(config: T, df: DataFrame): Source = Source(config.id.value, df, config.keyFields.map(_.value)) - + /** * Tries to read source given the source configuration. - * @param config Source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object - * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) + * + * @param config Source configuration + * @param readMode Mode in which source is read. Either 'batch' or 'stream' + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) * @param connections Map of existing connection (connectionID -> DQConnection) * @return Source + * + * @note Safeguard against reading non-streamable source as a stream is implemented in the higher-level + * method that uses this one. Therefore, current method implementation may just ignore 'readMode' + * argument for non-streamable sources. */ - def tryToRead(config: T)(implicit settings: AppSettings, - spark: SparkSession, - fs: FileSystem, - schemas: Map[String, SourceSchema], - connections: Map[String, DQConnection]): Source + def tryToRead(config: T, + readMode: ReadMode)(implicit settings: AppSettings, + spark: SparkSession, + fs: FileSystem, + schemas: Map[String, SourceSchema], + connections: Map[String, DQConnection]): Source /** * Safely reads source given source configuration. @@ -66,12 +86,78 @@ object SourceReaders { fs: FileSystem, schemas: Map[String, SourceSchema], connections: Map[String, DQConnection]): Result[Source] = - Try(tryToRead(config)).toResult( + Try(tryToRead(config, ReadMode.Batch)).toResult( preMsg = s"Unable to read source '${config.id.value}' due to following error: " ) - + + /** + * Safely reads streaming source given source configuration. + * + * @param config Source configuration (source must be streamable) + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) + * @param connections Map of existing connection (connectionID -> DQConnection) + * @return Either a valid Source or a list of source reading errors. + */ + def readStream(config: T)(implicit settings: AppSettings, + spark: SparkSession, + fs: FileSystem, + schemas: Map[String, SourceSchema], + connections: Map[String, DQConnection]): Result[Source] = Try { + if (config.streamable) tryToRead(config, ReadMode.Stream) + else throw new UnsupportedOperationException( + s"Source ${config.id} is not streamable and, therefore, cannot be read as a stream." + ) + }.toResult( + preMsg = s"Unable to read streaming source '${config.id.value}' due to following error: " + ) + } + + sealed trait SimpleFileReader { this: SourceReader[_] => + + /** + * Basic file source reader that reads file source either + * as a static dataframe or as a streaming dataframe. + * + * @param readMode Mode in which source is read. Either 'batch' or 'stream' + * @param path Path to read source from + * @param format File format + * @param schemaId Schema ID to apply while reading data + * @param spark Implicit spark session object + * @return Spark DataFrame + */ + protected def fileReader(readMode: ReadMode, + path: String, + format: String, + schemaId: Option[String]) + (implicit spark: SparkSession, + fs: FileSystem, + schemas: Map[String, SourceSchema]): DataFrame = { + + val reader = (schema: Option[SourceSchema]) => readMode match { + case ReadMode.Batch => + val batchReader = spark.read.format(format.toLowerCase) + if (format.toLowerCase == "avro") + schema.map(sch => batchReader.option("avroSchema", sch.toAvroSchema.toString)) + .getOrElse(batchReader).load(path) + else schema.map(sch => batchReader.schema(sch.schema)).getOrElse(batchReader).load(path) + case ReadMode.Stream => + val sch = schema.getOrElse(throw new IllegalArgumentException( + s"Schema is missing but it must be provided to read $format files as a stream." + )) + spark.readStream.format(format.toLowerCase).schema(sch.schema).load(path) + } + + if (fs.exists(new Path(path))) { + val sourceSchema = schemaId.map(sId => + schemas.getOrElse(sId, throw new NoSuchElementException(s"Schema with id = '$sId' not found.")) + ) // we want to throw exception if schemaId is provided but not found. + reader(sourceSchema) + } else throw new FileNotFoundException(s"$format file or directory not found: $path") + } } - + /** * Table source reader: reads source from JDBC Connection (Postgres, Oracle, etc) * @note In order to read table source it is required to provided map of valid connections @@ -80,25 +166,30 @@ object SourceReaders { /** * Tries to read table source given the source configuration. - * @param config Table source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object - * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) + * + * @param config Table source configuration + * @param readMode Mode in which source is read. Either 'batch' or 'stream' + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) * @param connections Map of existing connection (connectionID -> DQConnection) * @return Source + * @note TableSource is not streamable, therefore, 'readMode' argument is ignored + * and source is always read as static DataFrame. */ - def tryToRead(config: TableSourceConfig)(implicit settings: AppSettings, - spark: SparkSession, - fs: FileSystem, - schemas: Map[String, SourceSchema], - connections: Map[String, DQConnection]): Source = { + def tryToRead(config: TableSourceConfig, + readMode: ReadMode)(implicit settings: AppSettings, + spark: SparkSession, + fs: FileSystem, + schemas: Map[String, SourceSchema], + connections: Map[String, DQConnection]): Source = { val conn = connections.getOrElse(config.connection.value, throw new NoSuchElementException( s"JDBC connection with id = '${config.connection.value}' not found." )) require(conn.isInstanceOf[JdbcConnection[_]], s"Table source '${config.id.value}' refers to non-Jdbc connection.") - val df = conn.asInstanceOf[JdbcConnection[_]].loadDataframe(config) + val df = conn.asInstanceOf[JdbcConnection[_]].loadDataFrame(config) toSource(config, df) } } @@ -111,18 +202,21 @@ object SourceReaders { /** * Tries to read kafka source given the source configuration. - * @param config Kafka source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object - * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) + * + * @param config Kafka source configuration + * @param readMode Mode in which source is read. Either 'batch' or 'stream' + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) * @param connections Map of existing connection (connectionID -> DQConnection) * @return Source */ - def tryToRead(config: KafkaSourceConfig)(implicit settings: AppSettings, - spark: SparkSession, - fs: FileSystem, - schemas: Map[String, SourceSchema], - connections: Map[String, DQConnection]): Source = { + def tryToRead(config: KafkaSourceConfig, + readMode: ReadMode)(implicit settings: AppSettings, + spark: SparkSession, + fs: FileSystem, + schemas: Map[String, SourceSchema], + connections: Map[String, DQConnection]): Source = { val conn = connections.getOrElse(config.connection.value, throw new NoSuchElementException( s"Kafka connection with id = '${config.connection.value}' not found." @@ -131,7 +225,10 @@ object SourceReaders { require(conn.isInstanceOf[KafkaConnection], s"Kafka source '${config.id.value}' refers to not a Kafka connection.") - val df = conn.asInstanceOf[KafkaConnection].loadDataframe(config) + val df = readMode match { + case ReadMode.Batch => conn.asInstanceOf[KafkaConnection].loadDataFrame(config) + case ReadMode.Stream => conn.asInstanceOf[KafkaConnection].loadDataStream(config) + } toSource(config, df) } } @@ -144,18 +241,23 @@ object SourceReaders { /** * Tries to read hive source given the source configuration. - * @param config Hive source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object - * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) + * + * @param config Hive source configuration + * @param readMode Mode in which source is read. Either 'batch' or 'stream' + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) * @param connections Map of existing connection (connectionID -> DQConnection) * @return Source + * @note HiveSource is not streamable, therefore, 'readMode' argument is ignored + * and source is always read as static DataFrame. */ - def tryToRead(config: HiveSourceConfig)(implicit settings: AppSettings, - spark: SparkSession, - fs: FileSystem, - schemas: Map[String, SourceSchema], - connections: Map[String, DQConnection]): Source = { + def tryToRead(config: HiveSourceConfig, + readMode: ReadMode)(implicit settings: AppSettings, + spark: SparkSession, + fs: FileSystem, + schemas: Map[String, SourceSchema], + connections: Map[String, DQConnection]): Source = { val tableName = s"${config.schema.value}.${config.table.value}" val preDf = spark.read.table(tableName) val df = if (config.partitions.nonEmpty) { @@ -194,40 +296,50 @@ object SourceReaders { * @return Parsed row */ private def getRow(x: String, widths: Seq[Int]) = Row.fromSeq( - widthsToPositions(widths).map{ + widthsToPositions(widths).map { case (p1, p2) => Try(x.substring(p1, p2)).getOrElse(null) } ) /** * Tries to read fixed file source given the source configuration. - * @param config Fixed file source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object - * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) + * + * @param config Fixed file source configuration + * @param readMode Mode in which source is read. Either 'batch' or 'stream' + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) * @param connections Map of existing connection (connectionID -> DQConnection) * @return Source + * @note When read in stream mode, Spark will stream newly added files only. */ - def tryToRead(config: FixedFileSourceConfig)(implicit settings: AppSettings, - spark: SparkSession, - fs: FileSystem, - schemas: Map[String, SourceSchema], - connections: Map[String, DQConnection]): Source = { - - val sourceSchema = schemas.getOrElse(config.schema.value, throw new NoSuchElementException( - s"Schema with id = '${config.schema.value}' not found." - )) + def tryToRead(config: FixedFileSourceConfig, + readMode: ReadMode)(implicit settings: AppSettings, + spark: SparkSession, + fs: FileSystem, + schemas: Map[String, SourceSchema], + connections: Map[String, DQConnection]): Source = { + + val schemaId = config.schema.map(_.value).getOrElse( + throw new IllegalArgumentException("Schema must always be provided to read fixed-width file.") + ) + val sourceSchema = schemas.getOrElse(schemaId, + throw new NoSuchElementException(s"Schema with id = '$schemaId' not found.") + ) val allStringSchema = StructType(sourceSchema.schema.map( col => StructField(col.name, StringType, nullable = true) )) - - val rdd = if (fs.exists(new Path(config.path.value))) - spark.sparkContext.textFile(config.path.value).map(r => getRow(r, sourceSchema.columnWidths)) - else throw new FileNotFoundException(s"Fixed-width text file not found: ${config.path.value}") - val df = spark.createDataFrame(rdd, allStringSchema).select( - sourceSchema.schema.map(f => col(f.name).cast(f.dataType)) :_* + + implicit val encoder: ExpressionEncoder[Row] = getRowEncoder(allStringSchema) + + val rawDf = if (fs.exists(new Path(config.path.value))) readMode match { + case ReadMode.Batch => spark.read.text(config.path.value) + case ReadMode.Stream => spark.readStream.text(config.path.value) + } else throw new FileNotFoundException(s"Fixed-width text file or directory not found: ${config.path.value}") + + val df = rawDf.map(c => getRow(c.getString(0), sourceSchema.columnWidths)).select( + sourceSchema.schema.map(f => col(f.name).cast(f.dataType)): _* ) - toSource(config, df) } } @@ -242,37 +354,52 @@ object SourceReaders { /** * Tries to read delimited file source given the source configuration. - * @param config Delimited file source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object - * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) + * + * @param config Delimited file source configuration + * @param readMode Mode in which source is read. Either 'batch' or 'stream' + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) * @param connections Map of existing connection (connectionID -> DQConnection) * @return Source + * @note When read in stream mode, Spark will stream newly added files only. */ - def tryToRead(config: DelimitedFileSourceConfig)(implicit settings: AppSettings, - spark: SparkSession, - fs: FileSystem, - schemas: Map[String, SourceSchema], - connections: Map[String, DQConnection]): Source = { - val preDf = spark.read.format("csv") - .option("sep", config.delimiter.value) - .option("quote", config.quote.value) - .option("escape", config.escape.value) - .option("mode", "FAILFAST") - + def tryToRead(config: DelimitedFileSourceConfig, + readMode: ReadMode)(implicit settings: AppSettings, + spark: SparkSession, + fs: FileSystem, + schemas: Map[String, SourceSchema], + connections: Map[String, DQConnection]): Source = { + + val reader = (opts: Map[String, String], schema: Option[StructType]) => readMode match { + case ReadMode.Batch => + val batchReader = spark.read.format("csv").options(opts) + schema.map(s => batchReader.schema(s)).getOrElse(batchReader).load(config.path.value) + case ReadMode.Stream => + val streamReader = spark.readStream.format("csv").options(opts) + schema.map(s => streamReader.schema(s)).getOrElse(streamReader).load(config.path.value) + } + + val readOptions = Map( + "sep" -> config.delimiter.value, + "quote" -> config.quote.value, + "escape" -> config.escape.value, + "mode" -> (if (readMode == ReadMode.Batch) "FAILFAST" else "PERMISSIVE") + ) + val df = if (fs.exists(new Path(config.path.value))) { (config.header, config.schema.map(_.value)) match { - case (true, None) => preDf.option("header", value = true).load(config.path.value) + case (true, None) => reader(readOptions + ("header" -> "true"), None) case (false, Some(schema)) => val sourceSchema = schemas.getOrElse(schema, throw new NoSuchElementException( s"Schema with id = '$schema' not found." )) - preDf.schema(sourceSchema.schema).load(config.path.value) + reader(readOptions, Some(sourceSchema.schema)) case _ => throw new IllegalArgumentException( "For delimited file sources schema must either be read from header or from explicit schema but not from both." ) } - } else throw new FileNotFoundException(s"Delimited text file not found: ${config.path.value}") + } else throw new FileNotFoundException(s"Delimited text file or directory not found: ${config.path.value}") toSource(config, df) } @@ -282,119 +409,115 @@ object SourceReaders { * Avro file source reader: reads avro file with optional explicit schema. * @note In order to read avro file source it is required to provide map of source schemas. */ - implicit object AvroFileSourceReader extends SourceReader[AvroFileSourceConfig] { + implicit object AvroFileSourceReader extends SourceReader[AvroFileSourceConfig] with SimpleFileReader { /** * Tries to read avro file source given the source configuration. - * @param config Avro file source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object - * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) + * + * @param config Avro file source configuration + * @param readMode Mode in which source is read. Either 'batch' or 'stream' + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) * @param connections Map of existing connection (connectionID -> DQConnection) * @return Source + * @note When read in stream mode, Spark will stream newly added files only. */ - def tryToRead(config: AvroFileSourceConfig)(implicit settings: AppSettings, - spark: SparkSession, - fs: FileSystem, - schemas: Map[String, SourceSchema], - connections: Map[String, DQConnection]): Source = { - val preDf = spark.read.format("avro") - - val df = if (fs.exists(new Path(config.path.value))) { - config.schema.map(_.value) match { - case Some(schema) => - val sourceSchema = schemas.getOrElse(schema, throw new NoSuchElementException( - s"Schema with id = '$schema' not found." - )) - preDf.option("avroSchema", sourceSchema.toAvroSchema.toString).load(config.path.value) - case None => preDf.load(config.path.value) - } - } else throw new FileNotFoundException(s"Avro file not found: ${config.path.value}") - - toSource(config, df) - } + def tryToRead(config: AvroFileSourceConfig, + readMode: ReadMode)(implicit settings: AppSettings, + spark: SparkSession, + fs: FileSystem, + schemas: Map[String, SourceSchema], + connections: Map[String, DQConnection]): Source = + toSource(config, fileReader(readMode, config.path.value, "Avro", config.schema.map(_.value))) } /** * Parquet file source reader: reads parquet files. */ - implicit object ParquetFileSourceReader extends SourceReader[ParquetFileSourceConfig] { + implicit object ParquetFileSourceReader extends SourceReader[ParquetFileSourceConfig] with SimpleFileReader { /** * Tries to read parquet file source given the source configuration. - * @param config Parquet file source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object - * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) + * + * @param config Parquet file source configuration + * @param readMode Mode in which source is read. Either 'batch' or 'stream' + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) * @param connections Map of existing connection (connectionID -> DQConnection) * @return Source + * @note When read in stream mode, Spark will stream newly added files only. */ - def tryToRead(config: ParquetFileSourceConfig)(implicit settings: AppSettings, - spark: SparkSession, - fs: FileSystem, - schemas: Map[String, SourceSchema], - connections: Map[String, DQConnection]): Source = - if (fs.exists(new Path(config.path.value))) - toSource(config, spark.read.parquet(config.path.value)) - else throw new FileNotFoundException(s"Parquet file not found: ${config.path.value}") + def tryToRead(config: ParquetFileSourceConfig, + readMode: ReadMode)(implicit settings: AppSettings, + spark: SparkSession, + fs: FileSystem, + schemas: Map[String, SourceSchema], + connections: Map[String, DQConnection]): Source = + toSource(config, fileReader(readMode, config.path.value, "Parquet", config.schema.map(_.value))) } /** * Orc file source reader: reads orc files. */ - implicit object OrcFileSourceReader extends SourceReader[OrcFileSourceConfig] { + implicit object OrcFileSourceReader extends SourceReader[OrcFileSourceConfig] with SimpleFileReader { /** * Tries to read orc file source given the source configuration. - * @param config Orc file source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object - * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) + * + * @param config Orc file source configuration + * @param readMode Mode in which source is read. Either 'batch' or 'stream' + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) * @param connections Map of existing connection (connectionID -> DQConnection) * @return Source */ - def tryToRead(config: OrcFileSourceConfig)(implicit settings: AppSettings, - spark: SparkSession, - fs: FileSystem, - schemas: Map[String, SourceSchema], - connections: Map[String, DQConnection]): Source = - if (fs.exists(new Path(config.path.value))) - toSource(config, spark.read.orc(config.path.value)) - else throw new FileNotFoundException(s"ORC file not found: ${config.path.value}") + def tryToRead(config: OrcFileSourceConfig, + readMode: ReadMode)(implicit settings: AppSettings, + spark: SparkSession, + fs: FileSystem, + schemas: Map[String, SourceSchema], + connections: Map[String, DQConnection]): Source = + toSource(config, fileReader(readMode, config.path.value, "ORC", config.schema.map(_.value))) } implicit object CustomSourceReader extends SourceReader[CustomSource] { + /** * Tries to read source given the source configuration. * * @param config Source configuration + * @param readMode Mode in which source is read. Either 'batch' or 'stream' * @param settings Implicit application settings object * @param spark Implicit spark session object * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) * @param connections Map of existing connection (connectionID -> DQConnection) * @return Source */ - def tryToRead(config: CustomSource)(implicit settings: AppSettings, + def tryToRead(config: CustomSource, + readMode: ReadMode)(implicit settings: AppSettings, spark: SparkSession, fs: FileSystem, schemas: Map[String, SourceSchema], connections: Map[String, DQConnection]): Source = { val readOptions = paramsSeqToMap(config.options.map(_.value)) - val sparkReaderInit = spark.read.format(config.format.value).options(readOptions) - // if schema is provided use it explicitly in DataFrame reader: - val sparkReader = config.schema.map(_.value) match { - case Some(schema) => - val sourceSchema = schemas.getOrElse(schema, throw new NoSuchElementException( - s"Schema with id = '$schema' not found." - )) - sparkReaderInit.schema(sourceSchema.schema) - case None => sparkReaderInit - } - // if path to load from is provided then load source from that path: - val df = config.path.map(_.value) match { - case Some(path) => sparkReader.load(path) - case None => sparkReader.load() + val sourceSchema = config.schema.map(_.value).map(sId => + schemas.getOrElse(sId, throw new NoSuchElementException(s"Schema with id = '$sId' not found.")) + ) // we want to throw exception if schemaId is provided but not found. + + val df = readMode match { + case ReadMode.Batch => + val readerInit = spark.read.format(config.format.value).options(readOptions) + val reader = sourceSchema.map(s => readerInit.schema(s.schema)).getOrElse(readerInit) + config.path.map(_.value).map(p => reader.load(p)).getOrElse(reader.load()) + case ReadMode.Stream => + val readerInit = spark.readStream.format(config.format.value).options(readOptions) + val reader = sourceSchema.map(s => readerInit.schema(s.schema)).getOrElse(readerInit) + config.path.map(_.value).map(p => reader.load(p)).getOrElse(reader.load()) } + toSource(config, df) } } @@ -403,37 +526,41 @@ object SourceReaders { * Generic regular source reader that calls specific reader depending on the source configuration type. */ implicit object AnySourceReader extends SourceReader[SourceConfig] { + /** * Tries to read any regular source given the source configuration. - * @param config Regular source configuration - * @param settings Implicit application settings object - * @param spark Implicit spark session object - * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) + * + * @param config Regular source configuration + * @param readMode Mode in which source is read. Either 'batch' or 'stream' + * @param settings Implicit application settings object + * @param spark Implicit spark session object + * @param schemas Map of explicitly defined schemas (schemaId -> SourceSchema) * @param connections Map of existing connection (connectionID -> DQConnection) * @return Source */ - def tryToRead(config: SourceConfig)(implicit settings: AppSettings, + def tryToRead(config: SourceConfig, + readMode: ReadMode)(implicit settings: AppSettings, spark: SparkSession, fs: FileSystem, schemas: Map[String, SourceSchema], connections: Map[String, DQConnection]): Source = config match { - case table: TableSourceConfig => TableSourceReader.tryToRead(table) - case kafka: KafkaSourceConfig => KafkaSourceReader.tryToRead(kafka) - case hive: HiveSourceConfig => HiveSourceReader.tryToRead(hive) - case fixed: FixedFileSourceConfig => FixedFileSourceReader.tryToRead(fixed) - case delimited: DelimitedFileSourceConfig => DelimitedFileSourceReader.tryToRead(delimited) - case avro: AvroFileSourceConfig => AvroFileSourceReader.tryToRead(avro) - case parquet: ParquetFileSourceConfig => ParquetFileSourceReader.tryToRead(parquet) - case orc: OrcFileSourceConfig => OrcFileSourceReader.tryToRead(orc) - case custom: CustomSource => CustomSourceReader.tryToRead(custom) + case table: TableSourceConfig => TableSourceReader.tryToRead(table, readMode) + case kafka: KafkaSourceConfig => KafkaSourceReader.tryToRead(kafka, readMode) + case hive: HiveSourceConfig => HiveSourceReader.tryToRead(hive, readMode) + case fixed: FixedFileSourceConfig => FixedFileSourceReader.tryToRead(fixed, readMode) + case delimited: DelimitedFileSourceConfig => DelimitedFileSourceReader.tryToRead(delimited, readMode) + case avro: AvroFileSourceConfig => AvroFileSourceReader.tryToRead(avro, readMode) + case parquet: ParquetFileSourceConfig => ParquetFileSourceReader.tryToRead(parquet, readMode) + case orc: OrcFileSourceConfig => OrcFileSourceReader.tryToRead(orc, readMode) + case custom: CustomSource => CustomSourceReader.tryToRead(custom, readMode) case other => throw new IllegalArgumentException(s"Unsupported source type: '${other.getClass.getTypeName}'") } } /** - * Implicit conversion for source configurations to enable read method for them. + * Implicit conversion for source configurations to enable read and readStream methods for them. * @param config Source configuration * @param reader Implicit reader for given source configuration * @param settings Implicit application settings object @@ -450,5 +577,6 @@ object SourceReaders { schemas: Map[String, SourceSchema], connections: Map[String, DQConnection]) { def read: Result[Source] = reader.read(config) + def readStream: Result[Source] = reader.readStream(config) } } diff --git a/checkita-core/src/main/scala/ru/raiffeisen/checkita/utils/SparkUtils.scala b/checkita-core/src/main/scala/ru/raiffeisen/checkita/utils/SparkUtils.scala index 8dffd9da..4b8c4871 100644 --- a/checkita-core/src/main/scala/ru/raiffeisen/checkita/utils/SparkUtils.scala +++ b/checkita-core/src/main/scala/ru/raiffeisen/checkita/utils/SparkUtils.scala @@ -2,11 +2,14 @@ package ru.raiffeisen.checkita.utils import org.apache.hadoop.fs.FileSystem import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import ru.raiffeisen.checkita.utils.ResultUtils._ +import scala.reflect.runtime.universe.{runtimeMirror, typeOf, TermName} +import scala.concurrent.duration._ import scala.util.Try import scala.util.matching.Regex @@ -93,4 +96,53 @@ object SparkUtils { fs.setWriteChecksum(false) // Suppress .crc files creation: fs }.toResult(preMsg = "Failed to create FileSystem object due to following error:") + + /** + * Implicit class conversion for scala duration object, to enhance it with + * following methods: + * - convert duration into spark interval string + * - convert duration into string with short notation of time unit (e.g. '10s' or '3h') + * Conversion is always made in terms of seconds. + * @param d Scala Duration object + */ + implicit class DurationOps(d: Duration) { + private val timeUnits: Map[TimeUnit, String] = Map( + DAYS -> "d", + HOURS -> "h", + MINUTES -> "m", + SECONDS -> "s", + MILLISECONDS -> "ms", + MICROSECONDS -> "micro", // avoid special using character 'ยต' + NANOSECONDS -> "ns" + ) + def toSparkInterval: String = s"${d.toSeconds} seconds" + def toShortString: String = d match { + case Duration(l, tu) => s"$l${timeUnits(tu)}" + } + } + + /** + * Gets row encoder for provided schema. + * Purpose of this method is to provide ability to create + * row encoder for different versions of Spark. + * Thus, Encoders API has changes in version 3.5.0. + * Scala reflection API is used to invoke proper + * row encoder constructor thus supporting different Encoders APIs. + * @param schema Dataframe schema to construct encoder for. + * @param spark Implicit spark session object. + * @return Row encoder (expression encoder for row). + */ + def getRowEncoder(schema: StructType) + (implicit spark: SparkSession): ExpressionEncoder[Row] = { + val rm = runtimeMirror(getClass.getClassLoader) + val (encoderMirror, encoderType) = + if (spark.version < "3.5.0") (rm.reflect(RowEncoder), typeOf[RowEncoder.type]) + else (rm.reflect(ExpressionEncoder), typeOf[ExpressionEncoder.type]) + val applyMethodSymbol = encoderType.decl(TermName("apply")).asTerm.alternatives.find(s => + s.asMethod.paramLists.map(_.map(_.typeSignature)) == List(List(typeOf[StructType])) + ).get.asMethod + val applyMethod = encoderMirror.reflectMethod(applyMethodSymbol) + applyMethod(schema).asInstanceOf[ExpressionEncoder[Row]] + } + } diff --git a/checkita-core/src/test/scala/ru/raiffeisen/checkita/readers/SourceReadersSpec.scala b/checkita-core/src/test/scala/ru/raiffeisen/checkita/readers/SourceReadersSpec.scala index d1ee5c0e..ac008e3c 100644 --- a/checkita-core/src/test/scala/ru/raiffeisen/checkita/readers/SourceReadersSpec.scala +++ b/checkita-core/src/test/scala/ru/raiffeisen/checkita/readers/SourceReadersSpec.scala @@ -87,10 +87,10 @@ class SourceReadersSpec extends AnyWordSpec with Matchers { "correctly read fixed-width text file" in { val fixedFullSourceConfig = FixedFileSourceConfig( - ID("fixedFullSource"), Refined.unsafeApply(filePath), "fixedFull" + ID("fixedFullSource"), Refined.unsafeApply(filePath), Some(ID("fixedFull")) ) val fixedShortSourceConfig = FixedFileSourceConfig( - ID("fixedShortSource"), Refined.unsafeApply(filePath), "fixedShort" + ID("fixedShortSource"), Refined.unsafeApply(filePath), Some(ID("fixedShort")) ) val fixedFullSource = FixedFileSourceReader.read(fixedFullSourceConfig) @@ -110,12 +110,12 @@ class SourceReadersSpec extends AnyWordSpec with Matchers { } "return error when file not found" in { - val sourceConfig = FixedFileSourceConfig(ID("fixedFullSource"), "some_file.txt", "fixedFull") + val sourceConfig = FixedFileSourceConfig(ID("fixedFullSource"), "some_file.txt", Some(ID("fixedFull"))) FixedFileSourceReader.read(sourceConfig).isLeft shouldEqual true } "return error when schema not found" in { - val sourceConfig = FixedFileSourceConfig(ID("fixedFullSource"), Refined.unsafeApply(filePath), "some_schema") + val sourceConfig = FixedFileSourceConfig(ID("fixedFullSource"), Refined.unsafeApply(filePath), Some(ID("some_schema"))) FixedFileSourceReader.read(sourceConfig).isLeft shouldEqual true } } @@ -129,7 +129,7 @@ class SourceReadersSpec extends AnyWordSpec with Matchers { ID("sourceHeader"), Refined.unsafeApply(fileWithHeader), header = true, schema = None ) val sourceConfigHeadless = DelimitedFileSourceConfig( - ID("sourceHeadless"), Refined.unsafeApply(fileWithoutHeader), schema = Some("delimited") + ID("sourceHeadless"), Refined.unsafeApply(fileWithoutHeader), schema = Some(ID("delimited")) ) val sourceHeader = DelimitedFileSourceReader.read(sourceConfigHeader) @@ -155,14 +155,14 @@ class SourceReadersSpec extends AnyWordSpec with Matchers { "return error when schema not found" in { val sourceConfig = DelimitedFileSourceConfig( - ID("sourceHeadless"), Refined.unsafeApply(fileWithoutHeader), schema = Some("some_schema") + ID("sourceHeadless"), Refined.unsafeApply(fileWithoutHeader), schema = Some(ID("some_schema")) ) DelimitedFileSourceReader.read(sourceConfig).isLeft shouldEqual true } "return error when both header and schema are provided in source configuration or none of them is provided" in { val sourceConfig1 = DelimitedFileSourceConfig( - ID("sourceConfig1"), Refined.unsafeApply(fileWithHeader), header = true, schema = Some("some_schema") + ID("sourceConfig1"), Refined.unsafeApply(fileWithHeader), header = true, schema = Some(ID("some_schema")) ) val sourceConfig2 = DelimitedFileSourceConfig( ID("sourceConfig2"), Refined.unsafeApply(fileWithHeader), schema = None @@ -192,7 +192,7 @@ class SourceReadersSpec extends AnyWordSpec with Matchers { "correctly read avro file with schema" in { val sourceConfig = AvroFileSourceConfig( - ID("avroSource"), Refined.unsafeApply(avroFilePath), schema = Some("avro") + ID("avroSource"), Refined.unsafeApply(avroFilePath), schema = Some(ID("avro")) ) val source = AvroFileSourceReader.read(sourceConfig) @@ -212,7 +212,7 @@ class SourceReadersSpec extends AnyWordSpec with Matchers { "return error when schema provided but not found" in { val sourceConfig = AvroFileSourceConfig( - ID("avroSource"), Refined.unsafeApply(avroFilePath), schema = Some("some_schema") + ID("avroSource"), Refined.unsafeApply(avroFilePath), schema = Some(ID("some_schema")) ) AvroFileSourceReader.read(sourceConfig).isLeft shouldEqual true } @@ -222,7 +222,7 @@ class SourceReadersSpec extends AnyWordSpec with Matchers { val parquetFilePath = getClass.getResource("/data/companies/inc_500_companies_2019.parquet").getPath "correctly read parquet file" in { - val sourceConfig = ParquetFileSourceConfig(ID("parquetSource"), Refined.unsafeApply(parquetFilePath)) + val sourceConfig = ParquetFileSourceConfig(ID("parquetSource"), Refined.unsafeApply(parquetFilePath), None) val source = ParquetFileSourceReader.read(sourceConfig) @@ -235,7 +235,7 @@ class SourceReadersSpec extends AnyWordSpec with Matchers { } "return error when file not found" in { - val sourceConfig = ParquetFileSourceConfig(ID("parquetSource"), "some_file.txt") + val sourceConfig = ParquetFileSourceConfig(ID("parquetSource"), "some_file.txt", None) ParquetFileSourceReader.read(sourceConfig).isLeft shouldEqual true } } @@ -244,7 +244,7 @@ class SourceReadersSpec extends AnyWordSpec with Matchers { val orcFilePath = getClass.getResource("/data/companies/inc_500_companies_2019.orc").getPath "correctly read orc file" in { - val sourceConfig = OrcFileSourceConfig(ID("orcSource"), Refined.unsafeApply(orcFilePath)) + val sourceConfig = OrcFileSourceConfig(ID("orcSource"), Refined.unsafeApply(orcFilePath), None) val source = OrcFileSourceReader.read(sourceConfig) @@ -257,7 +257,7 @@ class SourceReadersSpec extends AnyWordSpec with Matchers { } "return error when file not found" in { - val sourceConfig = OrcFileSourceConfig(ID("orcSource"), "some_file.txt") + val sourceConfig = OrcFileSourceConfig(ID("orcSource"), "some_file.txt", None) OrcFileSourceReader.read(sourceConfig).isLeft shouldEqual true } }