Skip to content

Commit

Permalink
fix: expression checks calculation for streaming jobs
Browse files Browse the repository at this point in the history
- fix check processing for streaming jobs to enable expression checks evaluation.
- fix minor bug to remove duplicates from expression check source references.
- update documentation to include information about check failure tolerance and persists options for regular sources.
  • Loading branch information
gabb1er committed Nov 12, 2024
1 parent 886d29c commit 0bf3cde
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import org.checkita.dqf.config.Enums.{CheckFailureTolerance, MetricEngineAPI}
* @param errorDumpSize Maximum number of errors to be collected per single metric per partition.
* @param outputRepartition Sets the number of partitions when writing outputs. By default writes single file.
* @param metricEngineAPI Metric processor API used to process metrics: either Spark RDD or Spark DF.
* @param checkFailureTolerance Returns the failure status if any of the checks fail.
* @param checkFailureTolerance Sets check failure tolerance for the application i.e.
* whether the application should return non-zero exit code when some the checks
* have failed.
*/
final case class Enablers(
allowSqlQueries: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ class DQContext(settings: AppSettings, spark: SparkSession, fs: FileSystem) exte
settings.streamConfig.checkpointDir match {
case Some(dir) =>
log.info(s"$checkpointStage Reading from checkpoint directory: ${dir.value}")
log.info(s"$checkpointStage Searching checkpoint for jobId = '$jobId' and jobHas = '$jh'")
log.info(s"$checkpointStage Searching checkpoint for jobId = '$jobId' and jobHash = '$jh'")
CheckpointIO.readCheckpoint(dir.value, jobId, jh)
case None =>
log.info(s"$checkpointStage Checkpoint directory is not set.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.apache.spark.sql.SparkSession
import org.checkita.dqf.appsettings.AppSettings
import org.checkita.dqf.config.ConfigEncryptor
import org.checkita.dqf.config.IO.{RenderOptions, writeEncryptedJobConfig, writeJobConfig}
import org.checkita.dqf.config.jobconf.Checks.{CheckConfig, SnapshotCheckConfig, TrendCheckConfig}
import org.checkita.dqf.config.jobconf.Checks.{CheckConfig, ExpressionCheck, SnapshotCheckConfig, TrendCheckConfig}
import org.checkita.dqf.config.jobconf.JobConfig
import org.checkita.dqf.config.jobconf.LoadChecks.LoadCheckConfig
import org.checkita.dqf.config.jobconf.Metrics.{ComposedMetricConfig, RegularMetricConfig, TopNMetricConfig, TrendMetricConfig}
Expand Down Expand Up @@ -294,12 +294,13 @@ trait DQJob extends Logging {

val filterOutMissingMetricRefs = (allChecks: Seq[CheckConfig]) =>
if (settings.streamConfig.allowEmptyWindows) allChecks.filter{ chk =>
val metricId = chk.metric.value
val compareMetricId = chk match {
case config: SnapshotCheckConfig => config.compareMetric.map(_.value)
case _ => None
val metricRefs = chk match {
case snapshotCheck: SnapshotCheckConfig =>
snapshotCheck.compareMetric.map(_.value).toSeq :+ snapshotCheck.metric.value
case trendCheck: TrendCheckConfig => Seq(trendCheck.metric.value)
case expressionCheck: ExpressionCheck => getTokens(expressionCheck.formula.value)
}
val predicate = (compareMetricId.toSeq :+ metricId).forall(metResults.contains)
val predicate = metricRefs.forall(metResults.contains)
if (!predicate) log.warn(
s"$stage Didn't got all required metric results for check '${chk.id.value}'. " +
"Streaming configuration parameter 'allowEmptyWindows' is set to 'true'. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ case class ExpressionCheckCalculator(checkId: String, formula: String) extends C
case Left(errs) => resultOnError(errs)
case Right(results) =>
val resMap = results.map{ case (k, v) => k -> v.result.toString }
val srcIds = results.values.flatMap(_.sourceIds).toSeq
val srcIds = results.values.flatMap(_.sourceIds).toSeq.distinct
Try {
val clearFormula = prepareFormula(formula)
val formulaWithValues = renderTemplate(clearFormula, resMap)
Expand Down
4 changes: 4 additions & 0 deletions docs/01-application-setup/01-ApplicationSettings.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ that controls various aspects of data quality job execution:
* `metricEngineAPI` - Sets engine to be used for regular metric processing: `rdd` (RDD-engine) or `df` (DF-engine) are
available. It is recommended to use DF-engine for batch applications while streaming applications support only
RDD-engine. *Optional, default is `rdd`*.
* `checkFailureTolerance` - Sets check failure tolerance for the application, i.e. whether the application should
return non-zero exit code when some the checks have failed.
For more info, see [Check Failure Tolerance](../02-general-information/08-CheckFailureTolerance.md).
*Optional, default is `none`*

If `enablers` section is missing then default values are used for all parameters above.

Expand Down
22 changes: 22 additions & 0 deletions docs/02-general-information/08-CheckFailureTolerance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Check Failure Tolerance

Checkita is designed in such way that metric and check calculators never fails.
All errors that occur during either metric computation or evaluation of checks are captured and
returned to the user via metric errors and check status messages respectively. The unified status model
also serves to distinguish between failure of logical condition configured within metric or check and
runtime exception that was captured during their computation.
See [Status Model used in Results](03-StatusModel.md) for more details.

From other perspective, sometimes it is quite handy to make DQ application fail when some of the checks fail.
For example, if DQ job is a part of an ETL process then it is good to have failure indication when DQ checks
are not satisfied. In order to support such behaviour Checkita offers different levels of check failure tolerance:

* `NONE` - DQ application will always return zero exit code, even if some of the checks have failed.
* `CRITICAL` - DQ application will return non-zero exit code only when some of the `critical` checks have failed.
Checks configuration supports `isCritical` boolean flag which is used to identify critical checks.
By default, all checks are non-critical.
See [Checks Configurations](../03-job-configuration/09-Checks.md) for more details.
* `ALL` - DQ application will return non-zero exit code if any of the configured checks have failed.

The check failure tolerance is set in application configuration file as described
in [Enablers](../01-application-setup/01-ApplicationSettings.md#enablers) section.
34 changes: 34 additions & 0 deletions docs/03-job-configuration/03-Sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Checkita can read sources from external systems such as RDBMS or Kafka. For this
connections to these systems in a first place. See [Connections Configuration](01-Connections.md) chapter for more
details on connections configurations.

Additionally, it is possible to cache sources that in memory or on disk in order to tune application performance.
This could be handful when source is used as a parent for more than one virtual source.
In such cases caching source allows not to calculate it multiple times.

Thus, currently Checkita supports four general types of sources:

* File sources: read files from local or remote file systems (HDFS, S3, etc.);
Expand Down Expand Up @@ -47,6 +51,11 @@ Common parameters for sources of any file type are:
directory/bucket will be read (assuming they all have the same schema). Note, that when reading from file system which
is not spark default file system, it is required to add FS prefix to the path, e.g. `file://` to read from local FS,
or `s3a://` to read from S3.
* `persist` - *Optional*. One of the allowed Spark StorageLevels used to cache sources.
By default, sources are not cached. Supported Spark StorageLevels are:
* `NONE`, `DISK_ONLY`, `DISK_ONLY_2`, `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_ONLY_SER`,
`MEMORY_ONLY_SER_2`, `MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `MEMORY_AND_DISK_SER`,
`MEMORY_AND_DISK_SER_2`, `OFF_HEAP`.
* `keyFields` - *Optional*. List of columns that form a Primary Key or are used to identify row within a dataset.
Key fields are primarily used in error collection reports. For more details on error collection, see
[Metric Error Collection](../02-general-information/04-ErrorCollection.md) chapter.
Expand Down Expand Up @@ -119,6 +128,11 @@ In order to read data from Hive table it is required to provide following:
* `values` - *Optional*. List of partition column name values to read.
> **IMPORTANT**: When defining partitions to read, it is required to specify either an SQL expression to filter
> partitions or an explicit list of partition values but not both.
* `persist` - *Optional*. One of the allowed Spark StorageLevels used to cache sources.
By default, sources are not cached. Supported Spark StorageLevels are:
* `NONE`, `DISK_ONLY`, `DISK_ONLY_2`, `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_ONLY_SER`,
`MEMORY_ONLY_SER_2`, `MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `MEMORY_AND_DISK_SER`,
`MEMORY_AND_DISK_SER_2`, `OFF_HEAP`.
* `keyFields` - *Optional*. List of columns that form a Primary Key or are used to identify row within a dataset.
Key fields are primarily used in error collection reports. For more details on error collection, see
[Metric Error Collection](../02-general-information/04-ErrorCollection.md) chapter.
Expand All @@ -141,6 +155,11 @@ supply following parameters:
for one of the supported RDBMS. See [Connections Configuration](01-Connections.md) chapter for more information.
* `table` - *Optional*. Table to read.
* `query` - *Optional*. Query to execute. Query result is read as table source.
* `persist` - *Optional*. One of the allowed Spark StorageLevels used to cache sources.
By default, sources are not cached. Supported Spark StorageLevels are:
* `NONE`, `DISK_ONLY`, `DISK_ONLY_2`, `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_ONLY_SER`,
`MEMORY_ONLY_SER_2`, `MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `MEMORY_AND_DISK_SER`,
`MEMORY_AND_DISK_SER_2`, `OFF_HEAP`.
* `keyFields` - *Optional*. List of columns that form a Primary Key or are used to identify row within a dataset.
Key fields are primarily used in error collection reports. For more details on error collection, see
[Metric Error Collection](../02-general-information/04-ErrorCollection.md) chapter.
Expand Down Expand Up @@ -191,6 +210,11 @@ parameters:
`failOnDataLoss, kafkaConsumer.pollTimeoutMs, fetchOffset.numRetries, fetchOffset.retryIntervalMs, maxOffsetsPerTrigger`.
Parameters are provided as a strings in format of `parameterName=parameterValue`.
For more information, see [Spark Kafka Integration Guide](https://spark.apache.org/docs/2.3.2/structured-streaming-kafka-integration.html).
* `persist` - *Optional*. One of the allowed Spark StorageLevels used to cache sources.
By default, sources are not cached. Supported Spark StorageLevels are:
* `NONE`, `DISK_ONLY`, `DISK_ONLY_2`, `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_ONLY_SER`,
`MEMORY_ONLY_SER_2`, `MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `MEMORY_AND_DISK_SER`,
`MEMORY_AND_DISK_SER_2`, `OFF_HEAP`.
* `keyFields` - *Optional*. List of columns that form a Primary Key or are used to identify row within a dataset.
Key fields are primarily used in error collection reports. For more details on error collection, see
[Metric Error Collection](../02-general-information/04-ErrorCollection.md) chapter.
Expand All @@ -214,6 +238,11 @@ In order to read data from Greenplum table using pivotal connector it is require
* `connection` - *Required*. Connection ID to use for table source. Connection ID must refer to Greenplum pivotal
connection. See [Connections Configuration](01-Connections.md) chapter for more information.
* `table` - *Optional*. Table to read.
* `persist` - *Optional*. One of the allowed Spark StorageLevels used to cache sources.
By default, sources are not cached. Supported Spark StorageLevels are:
* `NONE`, `DISK_ONLY`, `DISK_ONLY_2`, `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_ONLY_SER`,
`MEMORY_ONLY_SER_2`, `MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `MEMORY_AND_DISK_SER`,
`MEMORY_AND_DISK_SER_2`, `OFF_HEAP`.
* `keyFields` - *Optional*. List of columns that form a Primary Key or are used to identify row within a dataset.
Key fields are primarily used in error collection reports. For more details on error collection, see
[Metric Error Collection](../02-general-information/04-ErrorCollection.md) chapter.
Expand All @@ -232,6 +261,11 @@ provide following parameters:
* `path` - *Optional*. Path to read data from (if required).
* `schema` - *Optional*. Explicit schema to be applied to data from the given source (if required).
* `options` - *Optional*. Additional Spark parameters used to read data from the given source.
* `persist` - *Optional*. One of the allowed Spark StorageLevels used to cache sources.
By default, sources are not cached. Supported Spark StorageLevels are:
* `NONE`, `DISK_ONLY`, `DISK_ONLY_2`, `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_ONLY_SER`,
`MEMORY_ONLY_SER_2`, `MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `MEMORY_AND_DISK_SER`,
`MEMORY_AND_DISK_SER_2`, `OFF_HEAP`.
* `keyFields` - *Optional*. List of columns that form a Primary Key or are used to identify row within a dataset.
Key fields are primarily used in error collection reports. For more details on error collection, see
[Metric Error Collection](../02-general-information/04-ErrorCollection.md) chapter.
Expand Down
4 changes: 2 additions & 2 deletions docs/03-job-configuration/05-VirtualSources.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ The following types of virtual sources are supported:

All types of virtual sources have common features:

* It is possible to cache virtual sources in memory or on disk. This could be handful when virtual sources is used as
parent for more than one virtual source. In such cases caching virtual source allows not to calculate it multiple times.
* It is possible to cache virtual sources in memory or on disk. This could be handful when virtual source is used as
a parent for more than one virtual source. In such cases caching virtual source allows not to calculate it multiple times.
* Virtual source can be saved as a file in one of the supported format. This feature can be used for debugging purposes
or just to keep data transformations applied during quality checks.

Expand Down
18 changes: 15 additions & 3 deletions docs/03-job-configuration/09-Checks.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ Snapshot checks are configured using common set of parameters, which are:
* `metric` - *Required*. Metric ID which results is checked.
* `compareMetric` - *Optional*. Metric ID which result is used as a threshold.
* `threshold` - *Optional*. Explicit threshold value.
* `isCritical` - *Optional, default is `false`*. Boolean flag identifying whether this check is critical or not.
Check criticality is important when application check failure tolerance is set to `CRITICAL`.
For more information, see [Check Failure Tolerance](../02-general-information/08-CheckFailureTolerance.md).
* `metadata` - *Optional*. List of user-defined metadata parameters specific to this check where each parameter
is a string in format: `param.name=param.value`.

Expand Down Expand Up @@ -86,6 +89,9 @@ Trend checks are configured using following set of parameters:
for `averageBoundRange` check.
* `thresholdUpper` - *Required*. Sets maximum allowed upper deviation from historical average metric result. *Used only
for `averageBoundRange` check.
* `isCritical` - *Optional, default is `false`*. Boolean flag identifying whether this check is critical or not.
Check criticality is important when application check failure tolerance is set to `CRITICAL`.
For more information, see [Check Failure Tolerance](../02-general-information/08-CheckFailureTolerance.md).
* `metadata` - *Optional*. List of user-defined metadata parameters specific to this metric where each parameter
is a string in format: `param.name=param.value`.

Expand All @@ -110,6 +116,9 @@ Top N rank check is configured using following parameters:
This number should be less than or equal to number of collected top values in top N metric.
* `threshold` - *Required*. Maximum allowed Jacquard distance between current and previous sets of records from
top N metric result. Should be a number in interval `[0, 1]`.
* `isCritical` - *Optional, default is `false`*. Boolean flag identifying whether this check is critical or not.
Check criticality is important when application check failure tolerance is set to `CRITICAL`.
For more information, see [Check Failure Tolerance](../02-general-information/08-CheckFailureTolerance.md).
* `metadata` - *Optional*. List of user-defined metadata parameters specific to this metric where each parameter
is a string in format: `param.name=param.value`.

Expand All @@ -136,6 +145,9 @@ Expression checks are configured using following set of parameters:
* `id` - *Required*. Check ID
* `description` - *Optional*. Description of the check.
* `formula` - *Required*. Check formula: boolean expression referring to metric results.
* `isCritical` - *Optional, default is `false`*. Boolean flag identifying whether this check is critical or not.
Check criticality is important when application check failure tolerance is set to `CRITICAL`.
For more information, see [Check Failure Tolerance](../02-general-information/08-CheckFailureTolerance.md).
* `metadata` - *Optional*. List of user-defined metadata parameters specific to this check where each parameter
is a string in format: `param.name=param.value`.

Expand Down Expand Up @@ -197,13 +209,13 @@ jobConfig: {
}
]
equalTo: [
{id: "zero_nulls", description: "Hive Table1 mustn't contain nulls", metric: "hive_table_nulls", threshold: 0}
{id: "zero_nulls", description: "Hive Table1 mustn't contain nulls", metric: "hive_table_nulls", threshold: 0, isCritical: true}
]
greaterThan: [
{id: "completeness_check", metric: "orc_data_compl", threshold: 0.99}
{id: "completeness_check", metric: "orc_data_compl", threshold: 0.99, isCritical: true}
]
lessThan: [
{id: "null_threshold", metric: "pct_of_null", threshold: 0.01}
{id: "null_threshold", metric: "pct_of_null", threshold: 0.01, isCritical: true}
]
}
expression: [
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ nav:
- 02-general-information/05-StreamingMode.md
- 02-general-information/06-MetricSQLEquivalency.md
- 02-general-information/07-MetricCalculatorEngines.md
- 02-general-information/08-CheckFailureTolerance.md
- Job Configuration:
- 03-job-configuration/index.md
- 03-job-configuration/01-Connections.md
Expand Down

0 comments on commit 0bf3cde

Please sign in to comment.