Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding streaming sources and stream readers #16

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.log4j.Level
import org.apache.spark.SparkConf
import ru.raiffeisen.checkita.config.IO.readAppConfig
import ru.raiffeisen.checkita.config.Parsers._
import ru.raiffeisen.checkita.config.appconf.{AppConfig, EmailConfig, MattermostConfig, StorageConfig}
import ru.raiffeisen.checkita.config.appconf.{AppConfig, EmailConfig, MattermostConfig, StorageConfig, StreamConfig}
import ru.raiffeisen.checkita.utils.Common.{paramsSeqToMap, prepareConfig}
import ru.raiffeisen.checkita.utils.ResultUtils._
import ru.raiffeisen.checkita.utils.EnrichedDT
Expand All @@ -27,6 +27,7 @@ import scala.util.Try
* @param storageConfig Configuration of connection to Data Quality Storage
* @param emailConfig Configuration of connection to SMTP server
* @param mattermostConfig Configuration of connection to Mattermost API
* @param streamConfig Streaming settings (used in streaming applications only)
* @param sparkConf Spark configuration parameters
* @param isLocal Boolean flag indicating whether spark application must be run locally.
* @param isShared Boolean flag indicating whether spark application running within shared spark context.
Expand All @@ -47,6 +48,7 @@ final case class AppSettings(
storageConfig: Option[StorageConfig],
emailConfig: Option[EmailConfig],
mattermostConfig: Option[MattermostConfig],
streamConfig: StreamConfig,
sparkConf: SparkConf,
isLocal: Boolean,
isShared: Boolean,
Expand Down Expand Up @@ -124,6 +126,7 @@ object AppSettings {
appConfig.storage,
appConfig.email,
appConfig.mattermost,
appConfig.streaming,
conf,
isLocal,
isShared,
Expand Down Expand Up @@ -190,6 +193,7 @@ object AppSettings {
defaultAppConf.storage,
defaultAppConf.email,
defaultAppConf.mattermost,
defaultAppConf.streaming,
new SparkConf(),
isLocal = false,
isShared = false,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ru.raiffeisen.checkita.config

import enumeratum.{Enum, EnumEntry}
import ru.raiffeisen.checkita.config.Parsers.idParser

import scala.collection.immutable

Expand All @@ -27,13 +28,39 @@ object Enums {
*/
sealed trait KafkaTopicFormat extends EnumEntry
object KafkaTopicFormat extends Enum[KafkaTopicFormat] {
case object String extends KafkaTopicFormat
case object Xml extends KafkaTopicFormat
case object Json extends KafkaTopicFormat
case object Avro extends KafkaTopicFormat

override val values: immutable.IndexedSeq[KafkaTopicFormat] = findValues
}

/**
* Supported Kafka windowing types, that control how windows are build:
* - based on processing time: current timestamp when rows is processed;
* - based on event time: kafka message creation timestamp;
* - custom time: arbitrary timestamp column from kafka message defined by user.
* @note this is not an enumeration since custom time type requires user-defined column name.
*/
sealed trait KafkaWindowing { val windowBy: String }
case object ProcessingTime extends KafkaWindowing { val windowBy: String = "processingTime" }
case object EventTime extends KafkaWindowing { val windowBy: String = "eventTime" }
case class CustomTime(column: String) extends KafkaWindowing { val windowBy: String = s"custom($column)" }
object KafkaWindowing {
private val customPattern = """^custom\((.+)\)$""".r
def apply(s: String): KafkaWindowing = s match {
case "processingTime" => ProcessingTime
case "eventTime" => EventTime
case custom if customPattern.pattern.matcher(custom).matches() =>
val customPattern(columnName) = custom
val _ = idParser.parseTableIdentifier(columnName) // verify if columnName is a valid Spark SQL identifier
CustomTime(columnName)
case other => throw new IllegalArgumentException(s"Wrong Kafka windowing type: $other")
}
}


/**
* Supported file types for FileTypeLoadCheck
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,28 @@
package ru.raiffeisen.checkita.config

import org.apache.spark.sql.Column
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.storage.StorageLevel
import pureconfig.generic.{FieldCoproductHint, ProductHint}
import pureconfig.{CamelCase, ConfigConvert, ConfigFieldMapping}
import ru.raiffeisen.checkita.config.Enums._
import ru.raiffeisen.checkita.config.Parsers.idParser
import ru.raiffeisen.checkita.config.RefinedTypes.{DateFormat, ID}
import ru.raiffeisen.checkita.config.jobconf.Outputs.FileOutputConfig
import ru.raiffeisen.checkita.config.jobconf.Schemas.SchemaConfig
import ru.raiffeisen.checkita.config.jobconf.Sources.{FileSourceConfig, VirtualSourceConfig}
import ru.raiffeisen.checkita.utils.SparkUtils.{toDataType, toStorageLvlString}
import ru.raiffeisen.checkita.utils.SparkUtils.{DurationOps, toDataType, toStorageLvlString}

import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.TimeZone
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration.Duration

/**
* Implicit pureconfig hints and converters for specific types used in Job Configuration.
*/
object Implicits {

/** SparkSqlParser used to validate IDs
* @note SparkSqlParser has evolved in Spark 3.1 to use active SQLConf.
* Thus, its constructor became parameterless.
* Therefore, in order to instantiate SparkSqlParser it is required
* to get constructor specific to current Spark version.
*/
private val idParser = {
val parserCls = classOf[SparkSqlParser]
(Try(parserCls.getConstructor()), Try(parserCls.getConstructor(classOf[SQLConf]))) match {
case (Success(constructor), Failure(_)) => constructor.newInstance()
case (Failure(_), Success(constructor)) => constructor.newInstance(new SQLConf)
case _ => throw new NoSuchMethodException("Unable to construct Spark SQL Parser")
}
}

implicit def hint[A]: ProductHint[A] = ProductHint[A](
ConfigFieldMapping(CamelCase, CamelCase),
allowUnknownKeys = false
Expand All @@ -55,10 +38,7 @@ object Implicits {
)

implicit val dateFormatConverter: ConfigConvert[DateFormat] =
ConfigConvert[String].xmap[DateFormat](
pattern => DateFormat(pattern, DateTimeFormatter.ofPattern(pattern)),
dateFormat => dateFormat.pattern
)
ConfigConvert[String].xmap[DateFormat](DateFormat.fromString, _.pattern)

implicit val timeZoneConverter: ConfigConvert[ZoneId] =
ConfigConvert[String].xmap[ZoneId](
Expand All @@ -69,12 +49,18 @@ object Implicits {
implicit val dqStorageTypeConverter: ConfigConvert[DQStorageType] =
ConfigConvert[String].xmap[DQStorageType](DQStorageType.withNameInsensitive, _.toString.toLowerCase)

implicit val dqDataTypeReader: ConfigConvert[DataType] =
implicit val dqDataTypeConverter: ConfigConvert[DataType] =
ConfigConvert[String].xmap[DataType](
toDataType,
t => t.toString.toLowerCase.replace("type", "")
)

implicit val durationTypeConverter: ConfigConvert[Duration] =
ConfigConvert[String].xmap[Duration](Duration(_), _.toShortString)

implicit val kafkaWindowingConverter: ConfigConvert[KafkaWindowing] =
ConfigConvert[String].xmap[KafkaWindowing](KafkaWindowing(_), _.windowBy)

private val dropRight: String => String => String =
dropText => s => s.dropRight(dropText.length).zipWithIndex.map(t => if (t._2 == 0) t._1.toLower else t._1).mkString

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
package ru.raiffeisen.checkita.config

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.internal.SQLConf
import pureconfig.error.{ConfigReaderFailure, ConvertFailure}

import java.io.{File, FileNotFoundException, InputStreamReader}
import scala.util.{Failure, Success, Try}

object Parsers {

/** SparkSqlParser used to validate IDs
*
* @note SparkSqlParser has evolved in Spark 3.1 to use active SQLConf.
* Thus, its constructor became parameterless.
* Therefore, in order to instantiate SparkSqlParser it is required
* to get constructor specific to current Spark version.
*/
val idParser: SparkSqlParser = {
val parserCls = classOf[SparkSqlParser]
(Try(parserCls.getConstructor()), Try(parserCls.getConstructor(classOf[SQLConf]))) match {
case (Success(constructor), Failure(_)) => constructor.newInstance()
case (Failure(_), Success(constructor)) => constructor.newInstance(new SQLConf)
case _ => throw new NoSuchMethodException("Unable to construct Spark SQL Parser")
}
}

/**
* Type class for Config parsers
* @tparam T Type of the input from which the config should be parsed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,7 @@ object RefinedTypes {
* @param formatter - datetime formatter for pattern
*/
case class DateFormat(pattern: String, @transient formatter: DateTimeFormatter)
object DateFormat {
def fromString(s: String): DateFormat = DateFormat(s, DateTimeFormatter.ofPattern(s))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ final case class AppConfig(
storage: Option[StorageConfig],
email: Option[EmailConfig],
mattermost: Option[MattermostConfig],
streaming: StreamConfig = StreamConfig(),
dateTimeOptions: DateTimeConfig = DateTimeConfig(),
enablers: Enablers = Enablers(),
defaultSparkOptions: Seq[SparkParam] = Seq.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ import ru.raiffeisen.checkita.config.RefinedTypes.DateFormat
*/
case class DateTimeConfig(
timeZone: ZoneId = ZoneId.of("UTC"),
referenceDateFormat: DateFormat = DateFormat(
pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS",
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS")
),
executionDateFormat: DateFormat = DateFormat(
pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS",
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS")
)
referenceDateFormat: DateFormat = DateFormat.fromString("yyyy-MM-dd'T'HH:mm:ss.SSS"),
executionDateFormat: DateFormat = DateFormat.fromString("yyyy-MM-dd'T'HH:mm:ss.SSS")
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ru.raiffeisen.checkita.config.appconf

import scala.concurrent.duration.Duration
import java.util.UUID

/**
* Application-level configuration describing streaming settings
* @param trigger Trigger interval: defines time interval for which micro-batches are collected.
* @param window Window interval: defines tabbing window size used to accumulate metrics.
* @param watermark Watermark level: defines time interval after which late records are no longer processed.
*/
case class StreamConfig(
trigger: Duration = Duration("10s"),
window: Duration = Duration("10m"),
watermark: Duration = Duration("5m")
) {
// random column names are generated to be used for windowing:
lazy val windowTsCol: String = UUID.randomUUID.toString.replace("-", "")
lazy val eventTsCol: String = UUID.randomUUID.toString.replace("-", "")
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ru.raiffeisen.checkita.config.jobconf

import eu.timepit.refined.types.string.NonEmptyString
import ru.raiffeisen.checkita.config.RefinedTypes.URI
import ru.raiffeisen.checkita.config.RefinedTypes.{ID, URI}

/**
* @note General note on working with files in Checkita Framework:
Expand All @@ -19,9 +19,11 @@ object Files {
/**
* Base trait for all types of configurations that refer to a file.
* All such configurations must contain a path to a file.
* In addition, optional schemaId can be provided to point to a schema used for data reading.
*/
trait FileConfig {
val path: URI
val schema: Option[ID]
}

/**
Expand All @@ -37,9 +39,13 @@ object Files {
/**
* Base trait for Avro files. Configuration for avro files may contain reference to avro schema ID.
*/
trait AvroFileConfig extends FileConfig {
val schema: Option[NonEmptyString]
}
trait AvroFileConfig extends FileConfig

/**
* Base trait for fixed-width text files.
* For such files configuration must contain reference explicit fixed (full or short) schema ID
*/
trait FixedFileConfig extends FileConfig

/**
* Base trait for delimited text files such as CSV or TSV.
Expand All @@ -55,14 +61,6 @@ object Files {
val quote: NonEmptyString
val escape: NonEmptyString
val header: Boolean
val schema: Option[NonEmptyString]
}

/**
* Base trait for fixed-width text files.
* For such files configuration must contain reference explicit fixed (full or short) schema ID
*/
trait FixedFileConfig extends FileConfig {
val schema: NonEmptyString
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package ru.raiffeisen.checkita.config.jobconf

import ru.raiffeisen.checkita.config.RefinedTypes.{ID, SparkParam}
import ru.raiffeisen.checkita.config.RefinedTypes.ID
import ru.raiffeisen.checkita.config.jobconf.Checks.ChecksConfig
import ru.raiffeisen.checkita.config.jobconf.Connections.ConnectionsConfig
import ru.raiffeisen.checkita.config.jobconf.LoadChecks.LoadChecksConfig
import ru.raiffeisen.checkita.config.jobconf.Metrics.MetricsConfig
import ru.raiffeisen.checkita.config.jobconf.Schemas.SchemaConfig
import ru.raiffeisen.checkita.config.jobconf.Sources.{SourcesConfig, VirtualSourceConfig}
import ru.raiffeisen.checkita.config.jobconf.Sources.{SourcesConfig, StreamSourcesConfig, VirtualSourceConfig}
import ru.raiffeisen.checkita.config.jobconf.Targets.TargetsConfig

/**
* Data Quality job-level configuration
* @param jobId Job ID
* @param connections Connections to external data systems (RDBMS, Message Brokers, etc.)
* @param schemas Various schema definitions
* @param sources Data sources processed within current job.
* @param sources Data sources processed within current job (only applicable to batch jobs).
* @param streams Stream sources processed within current job (only applicable to streaming jobs).
* @param virtualSources Virtual sources to be created from basic sources
* @param loadChecks Load checks to be performed on data sources before reading data itself
* @param metrics Metrics to be calculated for data sources
Expand All @@ -26,6 +27,7 @@ final case class JobConfig(
connections: Option[ConnectionsConfig],
schemas: Seq[SchemaConfig] = Seq.empty,
sources: Option[SourcesConfig],
streams: Option[StreamSourcesConfig],
virtualSources: Seq[VirtualSourceConfig] = Seq.empty,
loadChecks: Option[LoadChecksConfig],
metrics: Option[MetricsConfig],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import eu.timepit.refined.api.Refined
import eu.timepit.refined.collection.NonEmpty
import eu.timepit.refined.types.string.NonEmptyString
import ru.raiffeisen.checkita.config.Enums.TemplateFormat
import ru.raiffeisen.checkita.config.RefinedTypes.{Email, MMRecipient, SparkParam, URI}
import ru.raiffeisen.checkita.config.RefinedTypes.{Email, ID, MMRecipient, SparkParam, URI}
import ru.raiffeisen.checkita.config.jobconf.Files._


Expand Down Expand Up @@ -45,7 +45,7 @@ object Outputs {
header: Boolean = false
) extends FileOutputConfig with DelimitedFileConfig {
// Schema is not required as input parameter as it is enforced on write.
val schema: Option[NonEmptyString] = None
val schema: Option[ID] = None
}

/**
Expand All @@ -56,7 +56,7 @@ object Outputs {
final case class AvroFileOutputConfig(
path: URI
) extends FileOutputConfig with AvroFileConfig {
val schema: Option[NonEmptyString] = None
val schema: Option[ID] = None
}

/**
Expand All @@ -66,7 +66,9 @@ object Outputs {
*/
final case class OrcFileOutputConfig(
path: URI
) extends FileOutputConfig with OrcFileConfig
) extends FileOutputConfig with OrcFileConfig {
val schema: Option[ID] = None
}

/**
* Parquet file output configuration
Expand All @@ -75,7 +77,9 @@ object Outputs {
*/
final case class ParquetFileOutputConfig(
path: URI
) extends FileOutputConfig with ParquetFileConfig
) extends FileOutputConfig with ParquetFileConfig {
val schema: Option[ID] = None
}


/**
Expand Down
Loading