Skip to content

Commit

Permalink
feat: added persist for regular sources (#63)
Browse files Browse the repository at this point in the history
- added the ability to persist a regular source if needed
  • Loading branch information
dmrmlvv authored Oct 24, 2024
1 parent 1af4105 commit 8070fc1
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ object Sources {
* Base class for all source configurations.
* All sources are described as DQ entities that might have an optional sequence of keyFields
* which will uniquely identify data row in error collection reports.
* In addition, it should be indicated whether this source is streamable or not.
* It should also be indicated whether this source is streamable or not.
* Additionally, the persist field specifies an optional storage level
* in order to persist source during job execution.
*/
sealed abstract class SourceConfig extends JobConfigEntity {
val keyFields: Seq[NonEmptyString]
val streamable: Boolean
val persist: Option[StorageLevel]
}

/**
Expand All @@ -32,6 +35,7 @@ object Sources {
* @param connection Connection ID (must be JDBC connection)
* @param table Table to read
* @param query Query to execute
* @param persist Spark storage level in order to persist dataframe during job execution.
* @param keyFields Sequence of key fields (columns that identify data row)
* @param metadata List of metadata parameters specific to this source
* @note Either table to read or query to execute must be defined but not both.
Expand All @@ -42,6 +46,7 @@ object Sources {
connection: ID,
table: Option[NonEmptyString],
query: Option[NonEmptyString],
persist: Option[StorageLevel],
keyFields: Seq[NonEmptyString] = Seq.empty,
metadata: Seq[SparkParam] = Seq.empty
) extends SourceConfig {
Expand All @@ -68,6 +73,7 @@ object Sources {
* @param description Source description
* @param schema Hive schema
* @param table Hive table
* @param persist Spark storage level in order to persist dataframe during job execution.
* @param partitions Sequence of partitions to read.
* The order of partition columns should correspond to order in which
* partition columns are defined in hive table DDL.
Expand All @@ -79,6 +85,7 @@ object Sources {
description: Option[NonEmptyString],
schema: NonEmptyString,
table: NonEmptyString,
persist: Option[StorageLevel],
partitions: Seq[HivePartition] = Seq.empty,
keyFields: Seq[NonEmptyString] = Seq.empty,
metadata: Seq[SparkParam] = Seq.empty
Expand All @@ -98,6 +105,7 @@ object Sources {
* If none is set, then "earliest" is used in batch jobs and "latest is used in streaming jobs.
* @param endingOffsets Json-string defining ending offset. Applicable only to batch jobs.
* If none is set then "latest" is used.
* @param persist Spark storage level in order to persist dataframe during job execution.
* @param windowBy Source of timestamp used to build windows. Applicable only for streaming jobs!
* Default: processingTime - uses current timestamp at the moment when Spark processes row.
* Other options are:
Expand Down Expand Up @@ -127,6 +135,7 @@ object Sources {
topicPattern: Option[NonEmptyString],
startingOffsets: Option[NonEmptyString], // earliest for batch, latest for stream
endingOffsets: Option[NonEmptyString], // latest for batch, ignored for stream.
persist: Option[StorageLevel],
windowBy: StreamWindowing = ProcessingTime,
keyFormat: KafkaTopicFormat = KafkaTopicFormat.String,
valueFormat: KafkaTopicFormat = KafkaTopicFormat.String,
Expand All @@ -147,6 +156,7 @@ object Sources {
* @param description Source description
* @param connection Connection ID (must be pivotal connection)
* @param table Table to read
* @param persist Spark storage level in order to persist dataframe during job execution.
* @param keyFields Sequence of key fields (columns that identify data row)
* @param metadata List of metadata parameters specific to this source
*/
Expand All @@ -155,6 +165,7 @@ object Sources {
description: Option[NonEmptyString],
connection: ID,
table: Option[NonEmptyString],
persist: Option[StorageLevel],
keyFields: Seq[NonEmptyString] = Seq.empty,
metadata: Seq[SparkParam] = Seq.empty
) extends SourceConfig {
Expand All @@ -177,6 +188,7 @@ object Sources {
* @param description Source description
* @param path Path to file
* @param schema Schema ID (must be either fixedFull or fixedShort schema)
* @param persist Spark storage level in order to persist dataframe during job execution.
* @param windowBy Source of timestamp used to build windows. Applicable only for streaming jobs!
* Default: processingTime - uses current timestamp at the moment when Spark processes row.
* Other options are:
Expand All @@ -191,6 +203,7 @@ object Sources {
description: Option[NonEmptyString],
path: URI,
schema: Option[ID],
persist: Option[StorageLevel],
windowBy: StreamWindowing = ProcessingTime,
keyFields: Seq[NonEmptyString] = Seq.empty,
metadata: Seq[SparkParam] = Seq.empty
Expand All @@ -209,6 +222,7 @@ object Sources {
* @param escape Escape symbol (default: \)
* @param header Boolean flag indicating whether schema should be read from file header (default: false)
* @param schema Schema ID (only if header = false)
* @param persist Spark storage level in order to persist dataframe during job execution.
* @param windowBy Source of timestamp used to build windows. Applicable only for streaming jobs!
* Default: processingTime - uses current timestamp at the moment when Spark processes row.
* Other options are:
Expand All @@ -223,6 +237,7 @@ object Sources {
description: Option[NonEmptyString],
path: URI,
schema: Option[ID],
persist: Option[StorageLevel],
delimiter: NonEmptyString = ",",
quote: NonEmptyString = "\"",
escape: NonEmptyString = "\\",
Expand All @@ -241,6 +256,7 @@ object Sources {
* @param description Source description
* @param path Path to file
* @param schema Schema ID
* @param persist Spark storage level in order to persist dataframe during job execution.
* @param windowBy Source of timestamp used to build windows. Applicable only for streaming jobs!
* Default: processingTime - uses current timestamp at the moment when Spark processes row.
* Other options are:
Expand All @@ -255,6 +271,7 @@ object Sources {
description: Option[NonEmptyString],
path: URI,
schema: Option[ID],
persist: Option[StorageLevel],
windowBy: StreamWindowing = ProcessingTime,
keyFields: Seq[NonEmptyString] = Seq.empty,
metadata: Seq[SparkParam] = Seq.empty
Expand All @@ -269,6 +286,7 @@ object Sources {
* @param description Source description
* @param path Path to file
* @param schema Schema ID
* @param persist Spark storage level in order to persist dataframe during job execution.
* @param windowBy Source of timestamp used to build windows. Applicable only for streaming jobs!
* Default: processingTime - uses current timestamp at the moment when Spark processes row.
* Other options are:
Expand All @@ -283,6 +301,7 @@ object Sources {
description: Option[NonEmptyString],
path: URI,
schema: Option[ID],
persist: Option[StorageLevel],
windowBy: StreamWindowing = ProcessingTime,
keyFields: Seq[NonEmptyString] = Seq.empty,
metadata: Seq[SparkParam] = Seq.empty
Expand All @@ -297,6 +316,7 @@ object Sources {
* @param description Source description
* @param path Path to file
* @param schema Schema ID
* @param persist Spark storage level in order to persist dataframe during job execution.
* @param windowBy Source of timestamp used to build windows. Applicable only for streaming jobs!
* Default: processingTime - uses current timestamp at the moment when Spark processes row.
* Other options are:
Expand All @@ -311,6 +331,7 @@ object Sources {
description: Option[NonEmptyString],
path: URI,
schema: Option[ID],
persist: Option[StorageLevel],
windowBy: StreamWindowing = ProcessingTime,
keyFields: Seq[NonEmptyString] = Seq.empty,
metadata: Seq[SparkParam] = Seq.empty
Expand All @@ -327,6 +348,7 @@ object Sources {
* @param format Source format to set in spark reader.
* @param path Path to load the source from (if required)
* @param schema Explicit schema applied to source data (if required)
* @param persist Spark storage level in order to persist dataframe during job execution.
* @param options List of additional spark options required to read the source (if any)
* @param keyFields Sequence of key fields (columns that identify data row)
* @param metadata List of metadata parameters specific to this source
Expand All @@ -337,6 +359,7 @@ object Sources {
format: NonEmptyString,
path: Option[URI],
schema: Option[ID],
persist: Option[StorageLevel],
options: Seq[SparkParam] = Seq.empty,
keyFields: Seq[NonEmptyString] = Seq.empty,
metadata: Seq[SparkParam] = Seq.empty
Expand All @@ -347,12 +370,10 @@ object Sources {
/**
* Base class for all virtual source configurations.
* In addition to basic source configuration,
* virtual sources might have following optional parameters:
* - spark persist storage level in order to persist virtual source during job execution
* virtual sources might have following optional parameter:
* - save configuration in order to save virtual source as a file.
*/
sealed abstract class VirtualSourceConfig extends SourceConfig {
val persist: Option[StorageLevel]
val save: Option[FileOutputConfig]
val parents: Seq[String]
val windowBy: Option[StreamWindowing]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ class DQContext(settings: AppSettings, spark: SparkSession, fs: FileSystem) exte

reduceToMap(sources.map{ srcConf =>
log.info(s"$sourceStage Reading source '${srcConf.id.value}'...")
if (srcConf.persist.nonEmpty && !readAsStream) {
log.info(
s"$sourceStage Persisting source '${srcConf.id.value}' to ${srcConf.persist.get.toString}."
)
}
val source = if (readAsStream) srcConf.readStream else srcConf.read
source.mapValue(s => Seq(s.id -> s))
.tap(_ => log.info(s"$sourceStage Success!")) // immediate logging of success state
Expand Down Expand Up @@ -332,6 +337,11 @@ class DQContext(settings: AppSettings, spark: SparkSession, fs: FileSystem) exte
curVsAndRest <- Try(getNextVS(vs, parents)).toResult()
newSource <- {
log.info(s"$virtualSourceStage Reading virtual source '${curVsAndRest._1.id.value}'...")
if (curVsAndRest._1.persist.nonEmpty && !readAsStream) {
log.info(
s"$virtualSourceStage Persisting virtual source '${curVsAndRest._1.id.value}' to ${curVsAndRest._1.persist.get.toString}."
)
}
(if (readAsStream) curVsAndRest._1.readStream(parents) else curVsAndRest._1.read(parents))
.tap(_ => log.info(s"$virtualSourceStage Success!")) // immediate logging of success state
.tap(s => log.debug(s.df.schema.treeString)) // debug source schema
Expand All @@ -349,10 +359,6 @@ class DQContext(settings: AppSettings, spark: SparkSession, fs: FileSystem) exte
} // save virtual source if save option is configured.
.mapLeft(_.map(e => s"$virtualSourceStage $e")) // update error messages with running stage
}
_ <- Try(curVsAndRest._1.persist.foreach{ sLvl =>
log.info(s"$virtualSourceStage Persisting virtual source '${curVsAndRest._1.id.value}' to ${sLvl.toString}.")
newSource.df.persist(sLvl)
}).toResult()
} yield newSource -> curVsAndRest._2

val updatedVirtualSource = newAndRest.map(_._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ object SourceReaders {
*
* @param config Source configuration
* @param df Spark Dataframe
* @param readMode Mode in which source is read. Either 'batch' or 'stream'
* @param checkpoint Initial source checkpoint (applicable only to streaming sources)
* @return Source
*/
protected def toSource(config: T, df: DataFrame, checkpoint: Option[Checkpoint] = None)
(implicit settings: AppSettings): Source = {
protected def toSource(config: T,
df: DataFrame,
readMode: ReadMode,
checkpoint: Option[Checkpoint] = None)(implicit settings: AppSettings): Source = {
if (config.persist.nonEmpty && readMode == ReadMode.Batch) df.persist(config.persist.get)
Source.validated(
config.id.value, df, config.keyFields.map(_.value), checkpoint = checkpoint
)(settings.enableCaseSensitivity)
Expand Down Expand Up @@ -196,7 +200,7 @@ object SourceReaders {
require(conn.isInstanceOf[JdbcConnection[_]], s"Table source '${config.id.value}' refers to non-Jdbc connection.")

val df = conn.asInstanceOf[JdbcConnection[_]].loadDataFrame(config)
toSource(config, df)
toSource(config, df, readMode)
}
}

Expand Down Expand Up @@ -237,14 +241,14 @@ object SourceReaders {

readMode match {
case ReadMode.Batch =>
toSource(config, kafkaConn.loadDataFrame(config), Some(Checkpoint.init(config)))
toSource(config, kafkaConn.loadDataFrame(config), readMode,Some(Checkpoint.init(config)))
case ReadMode.Stream =>
val checkpoint = checkpoints.get(config.id.value)
.map(_.asInstanceOf[KafkaCheckpoint])
.map(chk => kafkaConn.validateOrFixCheckpoint(chk, config))
.getOrElse(kafkaConn.initCheckpoint(config))
val df = conn.asInstanceOf[KafkaConnection].loadDataStream(config, checkpoint)
toSource(config, df, Some(checkpoint))
toSource(config, df, readMode,Some(checkpoint))
}
}
}
Expand Down Expand Up @@ -284,7 +288,7 @@ object SourceReaders {
s"Table source '${config.id.value}' refers to not pivotal greenplum connection.")

val df = conn.asInstanceOf[PivotalConnection].loadDataFrame(config)
toSource(config, df)
toSource(config, df, readMode)
}
}

Expand Down Expand Up @@ -325,7 +329,7 @@ object SourceReaders {
}.reduce(_ && _)
)
} else preDf
toSource(config, df)
toSource(config, df, readMode)
}
}

Expand Down Expand Up @@ -402,7 +406,7 @@ object SourceReaders {
val df = rawDf.map(c => getRow(c.getString(0), sourceSchema.columnWidths)).select(
sourceSchema.schema.map(f => col(f.name).cast(f.dataType)): _*
)
toSource(config, if (readMode == ReadMode.Batch) df else df.prepareStream(config.windowBy))
if (readMode == ReadMode.Batch) toSource(config, df, readMode) else toSource(config, df.prepareStream(config.windowBy), readMode)
}
}

Expand Down Expand Up @@ -464,8 +468,8 @@ object SourceReaders {
)
}
} else throw new FileNotFoundException(s"Delimited text file or directory not found: ${config.path.value}")
toSource(config, if (readMode == ReadMode.Batch) df else df.prepareStream(config.windowBy))

if (readMode == ReadMode.Batch) toSource(config, df, readMode) else toSource(config, df.prepareStream(config.windowBy), readMode)
}
}

Expand Down Expand Up @@ -495,7 +499,7 @@ object SourceReaders {
schemas: Map[String, SourceSchema],
connections: Map[String, DQConnection],
checkpoints: Map[String, Checkpoint]): Source =
toSource(config, fileReader(readMode, config.path.value, "Avro", config.schema.map(_.value), config.windowBy))
toSource(config, fileReader(readMode, config.path.value, "Avro", config.schema.map(_.value), config.windowBy), readMode)
}

/**
Expand Down Expand Up @@ -523,7 +527,7 @@ object SourceReaders {
schemas: Map[String, SourceSchema],
connections: Map[String, DQConnection],
checkpoints: Map[String, Checkpoint]): Source =
toSource(config, fileReader(readMode, config.path.value, "Parquet", config.schema.map(_.value), config.windowBy))
toSource(config, fileReader(readMode, config.path.value, "Parquet", config.schema.map(_.value), config.windowBy), readMode)
}

/**
Expand All @@ -550,7 +554,7 @@ object SourceReaders {
schemas: Map[String, SourceSchema],
connections: Map[String, DQConnection],
checkpoints: Map[String, Checkpoint]): Source =
toSource(config, fileReader(readMode, config.path.value, "ORC", config.schema.map(_.value), config.windowBy))
toSource(config, fileReader(readMode, config.path.value, "ORC", config.schema.map(_.value), config.windowBy), readMode)
}

implicit object CustomSourceReader extends SourceReader[CustomSource] {
Expand Down Expand Up @@ -590,7 +594,7 @@ object SourceReaders {
config.path.map(_.value).map(p => reader.load(p)).getOrElse(reader.load())
}

toSource(config, df)
toSource(config, df, readMode)
}
}

Expand Down
Loading

0 comments on commit 8070fc1

Please sign in to comment.