Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding arima and linear regression trend metrics #64

Merged
merged 3 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package org.checkita.dqf.config
import enumeratum.{Enum, EnumEntry}
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.types.TimestampType
import org.checkita.dqf.config.Parsers.idParser

import scala.collection.immutable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ object RefinedTypes {
type NonEmptyURISeq = Seq[URI] Refined NonEmpty

/**
* Refinements for double sequences:
* Refinements for numeric sequences:
*/
type NonEmptyDoubleSeq = Seq[Double] Refined NonEmpty

type ThreeElemIntSeq = Seq[Int] Refined Size[Equal[W.`3`.T]]

/**
* All IDs are parsers via SparkSqlParser so they are valid SparkSQL identifiers
* This is required to eliminate any issues with interoperability with Spark.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.checkita.dqf.config.jobconf

import eu.timepit.refined.types.string.NonEmptyString
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics
import org.json4s.jackson.Serialization.write
import org.checkita.dqf.config.Enums.TrendCheckRule
import org.checkita.dqf.config.RefinedTypes._
Expand All @@ -20,6 +19,7 @@ import org.checkita.dqf.core.metrics.rdd.regular.BasicNumericRDDMetrics._
import org.checkita.dqf.core.metrics.rdd.regular.AlgebirdRDDMetrics._
import org.checkita.dqf.core.metrics.rdd.regular.MultiColumnRDDMetrics._
import org.checkita.dqf.core.metrics.rdd.regular.FileRDDMetrics._
import org.checkita.dqf.core.metrics.trend.{ARIMAModel, DescriptiveStatisticModel, LinearRegressionModel, TrendMetricModel}
import org.checkita.dqf.utils.Common.{getFieldsMap, jsonFormats}


Expand Down Expand Up @@ -98,7 +98,7 @@ object Metrics {
}

/**
* Base class for trend metrics that computes statistics over historical metric results.
* Base class for trend metrics that computes their values based on historical metric results.
*/
sealed abstract class TrendMetricConfig extends MetricConfig with TrendMetric {
val lookupMetric: ID
Expand Down Expand Up @@ -1296,7 +1296,7 @@ object Metrics {
windowOffset: Option[NonEmptyString],
metadata: Seq[SparkParam] = Seq.empty
) extends TrendMetricConfig {
val aggFunc: DescriptiveStatistics => Double = stats => stats.getMean
val model: TrendMetricModel = DescriptiveStatisticModel.Avg
val metricName: MetricName = MetricName.TrendAvg
val paramString: Option[String] = None
}
Expand All @@ -1321,7 +1321,7 @@ object Metrics {
windowOffset: Option[NonEmptyString],
metadata: Seq[SparkParam] = Seq.empty
) extends TrendMetricConfig {
val aggFunc: DescriptiveStatistics => Double = stats => stats.getStandardDeviation
val model: TrendMetricModel = DescriptiveStatisticModel.Std
val metricName: MetricName = MetricName.TrendStd
val paramString: Option[String] = None
}
Expand All @@ -1346,7 +1346,7 @@ object Metrics {
windowOffset: Option[NonEmptyString],
metadata: Seq[SparkParam] = Seq.empty
) extends TrendMetricConfig {
val aggFunc: DescriptiveStatistics => Double = stats => stats.getMin
val model: TrendMetricModel = DescriptiveStatisticModel.Min
val metricName: MetricName = MetricName.TrendMin
val paramString: Option[String] = None
}
Expand All @@ -1371,7 +1371,7 @@ object Metrics {
windowOffset: Option[NonEmptyString],
metadata: Seq[SparkParam] = Seq.empty
) extends TrendMetricConfig {
val aggFunc: DescriptiveStatistics => Double = stats => stats.getMax
val model: TrendMetricModel = DescriptiveStatisticModel.Max
val metricName: MetricName = MetricName.TrendMax
val paramString: Option[String] = None
}
Expand All @@ -1396,7 +1396,7 @@ object Metrics {
windowOffset: Option[NonEmptyString],
metadata: Seq[SparkParam] = Seq.empty
) extends TrendMetricConfig {
val aggFunc: DescriptiveStatistics => Double = stats => stats.getSum
val model: TrendMetricModel = DescriptiveStatisticModel.Sum
val metricName: MetricName = MetricName.TrendSum
val paramString: Option[String] = None
}
Expand All @@ -1421,7 +1421,7 @@ object Metrics {
windowOffset: Option[NonEmptyString],
metadata: Seq[SparkParam] = Seq.empty
) extends TrendMetricConfig {
val aggFunc: DescriptiveStatistics => Double = stats => stats.getPercentile(50)
val model: TrendMetricModel = DescriptiveStatisticModel.Median
val metricName: MetricName = MetricName.TrendMedian
val paramString: Option[String] = None
}
Expand All @@ -1446,7 +1446,7 @@ object Metrics {
windowOffset: Option[NonEmptyString],
metadata: Seq[SparkParam] = Seq.empty
) extends TrendMetricConfig {
val aggFunc: DescriptiveStatistics => Double = stats => stats.getPercentile(25)
val model: TrendMetricModel = DescriptiveStatisticModel.FirstQuantile
val metricName: MetricName = MetricName.TrendFirstQ
val paramString: Option[String] = None
}
Expand All @@ -1471,7 +1471,7 @@ object Metrics {
windowOffset: Option[NonEmptyString],
metadata: Seq[SparkParam] = Seq.empty
) extends TrendMetricConfig {
val aggFunc: DescriptiveStatistics => Double = stats => stats.getPercentile(75)
val model: TrendMetricModel = DescriptiveStatisticModel.ThirdQuantile
val metricName: MetricName = MetricName.TrendThirdQ
val paramString: Option[String] = None
}
Expand All @@ -1498,11 +1498,67 @@ object Metrics {
windowOffset: Option[NonEmptyString],
metadata: Seq[SparkParam] = Seq.empty
) extends TrendMetricConfig {
val aggFunc: DescriptiveStatistics => Double = stats => stats.getPercentile(quantile.value * 100)
val model: TrendMetricModel = new DescriptiveStatisticModel.Quantile(quantile.value * 100)
val metricName: MetricName = MetricName.TrendQuantile
val paramString: Option[String] = Some(write(Map("quantile" -> quantile.value)))
}

/**
* Linear regression trend metric configuration
*
* @param id Metric ID
* @param description Metric Description
* @param lookupMetric Metric which historical results to pull for statistic calculation
* @param rule Window calculation rule: by datetime or by number of records.
* @param windowSize Size of the window for average metric value calculation (either a number of records or duration).
* @param windowOffset Optional window offset (either a number of records or duration)
* @param metadata List of metadata parameters specific to this metric
*/
final case class LinregTrendMetricConfig(
id: ID,
description: Option[NonEmptyString],
lookupMetric: ID,
rule: TrendCheckRule,
windowSize: NonEmptyString,
windowOffset: Option[NonEmptyString],
metadata: Seq[SparkParam] = Seq.empty
) extends TrendMetricConfig {
val model: TrendMetricModel = LinearRegressionModel
val metricName: MetricName = MetricName.TrendLinReg
val paramString: Option[String] = None
}

/**
* ARIMA trend metric configuration
*
* @param id Metric ID
* @param description Metric Description
* @param lookupMetric Metric which historical results to pull for statistic calculation
* @param rule Window calculation rule: by datetime or by number of records.
* @param order Sequence of ARIMA order parameters: [p, d, q]
* @param windowSize Size of the window for average metric value calculation (either a number of records or duration).
* @param windowOffset Optional window offset (either a number of records or duration)
* @param metadata List of metadata parameters specific to this metric
*/
final case class ArimaTrendMetricConfig(
id: ID,
description: Option[NonEmptyString],
lookupMetric: ID,
rule: TrendCheckRule,
order: ThreeElemIntSeq,
windowSize: NonEmptyString,
windowOffset: Option[NonEmptyString],
metadata: Seq[SparkParam] = Seq.empty
) extends TrendMetricConfig {
val model: TrendMetricModel = order.value match {
case Seq(p, d, q) => new ARIMAModel(p, d, q)
}
val metricName: MetricName = MetricName.TrendArima
val paramString: Option[String] = order.value match {
case Seq(p, d, q) => Some(write(Map("p" -> p, "d" -> d, "q" -> q)))
}
}

/** Data Quality job configuration section describing regular metrics
*
* @param rowCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,41 @@ object PostValidation {
))
}.toVector

/**
* Ensure validity of ARIMA model order parameters in ARIMA trend metric configuration.
*
* @note This check is run at post-validation stage, as it is quite difficult to derive
* pureconfig readers for sealed trait families (thus approach is used to define kinded configurations)
* to run check during config parsing.
*/
val validateArimaTrendMetricOrder: ConfigObject => Vector[ConfigReaderFailure] = root =>
Try(root.toConfig.getObjectList("jobConfig.metrics.trend").asScala)
.getOrElse(List.empty).zipWithIndex
.filter(c => c._1.toConfig.getString("kind").toLowerCase == "arima")
.flatMap {
case (tm, idx) =>
val order = tm.toConfig.getStringList("order").asScala.toSeq.map(_.toInt)
val validations = Seq(
(
(p: Int, d: Int, q: Int) => Seq(p, d, q).exists(_ < 0),
"ARIMA model order parameters must be non-negative."
),
(
(p: Int, _: Int, q: Int) => p + q <= 0,
"Either AR (p) or MA (q) order of ARIMA model (or both) must be non-zero."
)
)
order match {
case Seq(p, d, q) => validations.filter(_._1(p, d, q)).map{
case (_, msg) => ConvertFailure(
UserValidationFailed(msg),
None,
s"jobConfig.metrics.trend.$idx.${tm.get("id").renderWithOpts}"
)
}
}
}.toVector

/**
* Validation to check if DQ job configuration contains missing references
* from load checks to sources
Expand Down Expand Up @@ -753,6 +788,7 @@ object PostValidation {
validateComposedMetrics,
validateMetricCrossReferences,
validateTrendMetricWindowSettings,
validateArimaTrendMetricOrder,
validateLoadCheckRefs,
validateLoadCheckSchemaRefs,
validateSnapshotCheckRefs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ trait DQJob extends Logging {
log.info(s"$stage Finalize trend metric results...")
metResults.toSeq.flatMap(_._2).filter(_.resultType == ResultType.TrendMetric)
.map { r =>
val mConfig = metricsMap.get(r.metricId)
val mConfig = trendMetricsMap.get(r.metricId)
val desc = mConfig.flatMap(_.description).map(_.value)
val params = mConfig.flatMap(_.paramString)
val metadata = mConfig.flatMap(_.metadataString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ object MetricName extends Enum[MetricName] {
case object TrendMedian extends MetricName("TREND_MEDIAN")
case object TrendFirstQ extends MetricName("TREND_FIRST_QUARTILE")
case object TrendThirdQ extends MetricName("TREND_THIRD_QUARTILE")
case object TrendQuantile extends MetricName("TREND_Quantile")
case object TrendQuantile extends MetricName("TREND_QUANTILE")
case object TrendLinReg extends MetricName("TREND_LINREG")
case object TrendArima extends MetricName("TREND_ARIMA")
case object RowCount extends MetricName("ROW_COUNT")
case object NullValues extends MetricName("NULL_VALUES")
case object EmptyValues extends MetricName("EMPTY_VALUES")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package org.checkita.dqf.core.metrics

import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics
import org.checkita.dqf.config.Enums.TrendCheckRule
import org.checkita.dqf.core.metrics.trend.TrendMetricModel

import java.sql.Timestamp

/**
* Base class for trend metrics.
* All trend metrics must have defined following:
* - metric ID
* - metric name
* - lookup metric ID: metric which results will be pulled from DQ storage
* - aggregation function to compute statistic over historical metric results
* - model used to predict metric value based on historical metric results
* - rule to build time window: either record or duration
* - size of the window to pull historical results
* - offset from current date or record (depending on rule)
Expand All @@ -18,7 +20,7 @@ trait TrendMetric {
val metricId: String
val metricName: MetricName
val lookupMetricId: String
val aggFunc: DescriptiveStatistics => Double
val model: TrendMetricModel
val rule: TrendCheckRule
val wSize: String
val wOffset: Option[String]
Expand Down
Loading
Loading