From 8edc73f5918f4072ea36913f976ee6c74e7697cf Mon Sep 17 00:00:00 2001 From: Yury DYLKO Date: Mon, 11 Nov 2024 15:11:12 +0300 Subject: [PATCH 1/3] feat: adding arima and linear regression trend metrics - added two type of trend metric configurations - refactored trend metric calculator to allow implementation of more sophisticated models. - added linear regression model - added arima model --- .../scala/org/checkita/dqf/config/Enums.scala | 2 - .../checkita/dqf/config/RefinedTypes.scala | 5 +- .../checkita/dqf/config/jobconf/Metrics.scala | 78 +++- .../config/validation/PostValidation.scala | 36 ++ .../org/checkita/dqf/context/DQJob.scala | 2 +- .../dqf/core/metrics/MetricName.scala | 4 +- .../dqf/core/metrics/TrendMetric.scala | 8 +- .../dqf/core/metrics/trend/ARIMAModel.scala | 350 ++++++++++++++++++ .../trend/DescriptiveStatisticModel.scala | 47 +++ .../metrics/trend/LinearRegressionModel.scala | 45 +++ .../metrics/trend/TrendMetricCalculator.scala | 34 +- .../core/metrics/trend/TrendMetricModel.scala | 22 ++ .../dqf/core/serialization/Implicits.scala | 4 + 13 files changed, 591 insertions(+), 46 deletions(-) create mode 100644 checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/ARIMAModel.scala create mode 100644 checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/DescriptiveStatisticModel.scala create mode 100644 checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/LinearRegressionModel.scala create mode 100644 checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/TrendMetricModel.scala diff --git a/checkita-core/src/main/scala/org/checkita/dqf/config/Enums.scala b/checkita-core/src/main/scala/org/checkita/dqf/config/Enums.scala index 204dfa3..aa1b513 100644 --- a/checkita-core/src/main/scala/org/checkita/dqf/config/Enums.scala +++ b/checkita-core/src/main/scala/org/checkita/dqf/config/Enums.scala @@ -3,8 +3,6 @@ package org.checkita.dqf.config import enumeratum.{Enum, EnumEntry} import org.apache.spark.sql.Column import org.apache.spark.sql.functions.expr -import org.apache.spark.sql.types.TimestampType -import org.checkita.dqf.config.Parsers.idParser import scala.collection.immutable diff --git a/checkita-core/src/main/scala/org/checkita/dqf/config/RefinedTypes.scala b/checkita-core/src/main/scala/org/checkita/dqf/config/RefinedTypes.scala index d1317eb..059c013 100644 --- a/checkita-core/src/main/scala/org/checkita/dqf/config/RefinedTypes.scala +++ b/checkita-core/src/main/scala/org/checkita/dqf/config/RefinedTypes.scala @@ -38,10 +38,11 @@ object RefinedTypes { type NonEmptyURISeq = Seq[URI] Refined NonEmpty /** - * Refinements for double sequences: + * Refinements for numeric sequences: */ type NonEmptyDoubleSeq = Seq[Double] Refined NonEmpty - + type ThreeElemIntSeq = Seq[Int] Refined Size[Equal[W.`3`.T]] + /** * All IDs are parsers via SparkSqlParser so they are valid SparkSQL identifiers * This is required to eliminate any issues with interoperability with Spark. diff --git a/checkita-core/src/main/scala/org/checkita/dqf/config/jobconf/Metrics.scala b/checkita-core/src/main/scala/org/checkita/dqf/config/jobconf/Metrics.scala index 141db7e..033ec49 100644 --- a/checkita-core/src/main/scala/org/checkita/dqf/config/jobconf/Metrics.scala +++ b/checkita-core/src/main/scala/org/checkita/dqf/config/jobconf/Metrics.scala @@ -1,7 +1,6 @@ package org.checkita.dqf.config.jobconf import eu.timepit.refined.types.string.NonEmptyString -import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics import org.json4s.jackson.Serialization.write import org.checkita.dqf.config.Enums.TrendCheckRule import org.checkita.dqf.config.RefinedTypes._ @@ -20,6 +19,7 @@ import org.checkita.dqf.core.metrics.rdd.regular.BasicNumericRDDMetrics._ import org.checkita.dqf.core.metrics.rdd.regular.AlgebirdRDDMetrics._ import org.checkita.dqf.core.metrics.rdd.regular.MultiColumnRDDMetrics._ import org.checkita.dqf.core.metrics.rdd.regular.FileRDDMetrics._ +import org.checkita.dqf.core.metrics.trend.{ARIMAModel, DescriptiveStatisticModel, LinearRegressionModel, TrendMetricModel} import org.checkita.dqf.utils.Common.{getFieldsMap, jsonFormats} @@ -98,7 +98,7 @@ object Metrics { } /** - * Base class for trend metrics that computes statistics over historical metric results. + * Base class for trend metrics that computes their values based on historical metric results. */ sealed abstract class TrendMetricConfig extends MetricConfig with TrendMetric { val lookupMetric: ID @@ -1296,7 +1296,7 @@ object Metrics { windowOffset: Option[NonEmptyString], metadata: Seq[SparkParam] = Seq.empty ) extends TrendMetricConfig { - val aggFunc: DescriptiveStatistics => Double = stats => stats.getMean + val model: TrendMetricModel = DescriptiveStatisticModel.Avg val metricName: MetricName = MetricName.TrendAvg val paramString: Option[String] = None } @@ -1321,7 +1321,7 @@ object Metrics { windowOffset: Option[NonEmptyString], metadata: Seq[SparkParam] = Seq.empty ) extends TrendMetricConfig { - val aggFunc: DescriptiveStatistics => Double = stats => stats.getStandardDeviation + val model: TrendMetricModel = DescriptiveStatisticModel.Std val metricName: MetricName = MetricName.TrendStd val paramString: Option[String] = None } @@ -1346,7 +1346,7 @@ object Metrics { windowOffset: Option[NonEmptyString], metadata: Seq[SparkParam] = Seq.empty ) extends TrendMetricConfig { - val aggFunc: DescriptiveStatistics => Double = stats => stats.getMin + val model: TrendMetricModel = DescriptiveStatisticModel.Min val metricName: MetricName = MetricName.TrendMin val paramString: Option[String] = None } @@ -1371,7 +1371,7 @@ object Metrics { windowOffset: Option[NonEmptyString], metadata: Seq[SparkParam] = Seq.empty ) extends TrendMetricConfig { - val aggFunc: DescriptiveStatistics => Double = stats => stats.getMax + val model: TrendMetricModel = DescriptiveStatisticModel.Max val metricName: MetricName = MetricName.TrendMax val paramString: Option[String] = None } @@ -1396,7 +1396,7 @@ object Metrics { windowOffset: Option[NonEmptyString], metadata: Seq[SparkParam] = Seq.empty ) extends TrendMetricConfig { - val aggFunc: DescriptiveStatistics => Double = stats => stats.getSum + val model: TrendMetricModel = DescriptiveStatisticModel.Sum val metricName: MetricName = MetricName.TrendSum val paramString: Option[String] = None } @@ -1421,7 +1421,7 @@ object Metrics { windowOffset: Option[NonEmptyString], metadata: Seq[SparkParam] = Seq.empty ) extends TrendMetricConfig { - val aggFunc: DescriptiveStatistics => Double = stats => stats.getPercentile(50) + val model: TrendMetricModel = DescriptiveStatisticModel.Median val metricName: MetricName = MetricName.TrendMedian val paramString: Option[String] = None } @@ -1446,7 +1446,7 @@ object Metrics { windowOffset: Option[NonEmptyString], metadata: Seq[SparkParam] = Seq.empty ) extends TrendMetricConfig { - val aggFunc: DescriptiveStatistics => Double = stats => stats.getPercentile(25) + val model: TrendMetricModel = DescriptiveStatisticModel.FirstQuantile val metricName: MetricName = MetricName.TrendFirstQ val paramString: Option[String] = None } @@ -1471,7 +1471,7 @@ object Metrics { windowOffset: Option[NonEmptyString], metadata: Seq[SparkParam] = Seq.empty ) extends TrendMetricConfig { - val aggFunc: DescriptiveStatistics => Double = stats => stats.getPercentile(75) + val model: TrendMetricModel = DescriptiveStatisticModel.ThirdQuantile val metricName: MetricName = MetricName.TrendThirdQ val paramString: Option[String] = None } @@ -1498,11 +1498,67 @@ object Metrics { windowOffset: Option[NonEmptyString], metadata: Seq[SparkParam] = Seq.empty ) extends TrendMetricConfig { - val aggFunc: DescriptiveStatistics => Double = stats => stats.getPercentile(quantile.value * 100) + val model: TrendMetricModel = new DescriptiveStatisticModel.Quantile(quantile.value * 100) val metricName: MetricName = MetricName.TrendQuantile val paramString: Option[String] = Some(write(Map("quantile" -> quantile.value))) } + /** + * Linear regression trend metric configuration + * + * @param id Metric ID + * @param description Metric Description + * @param lookupMetric Metric which historical results to pull for statistic calculation + * @param rule Window calculation rule: by datetime or by number of records. + * @param windowSize Size of the window for average metric value calculation (either a number of records or duration). + * @param windowOffset Optional window offset (either a number of records or duration) + * @param metadata List of metadata parameters specific to this metric + */ + final case class LinregTrendMetricConfig( + id: ID, + description: Option[NonEmptyString], + lookupMetric: ID, + rule: TrendCheckRule, + windowSize: NonEmptyString, + windowOffset: Option[NonEmptyString], + metadata: Seq[SparkParam] = Seq.empty + ) extends TrendMetricConfig { + val model: TrendMetricModel = LinearRegressionModel + val metricName: MetricName = MetricName.TrendLinReg + val paramString: Option[String] = None + } + + /** + * ARIMA trend metric configuration + * + * @param id Metric ID + * @param description Metric Description + * @param lookupMetric Metric which historical results to pull for statistic calculation + * @param rule Window calculation rule: by datetime or by number of records. + * @param order Sequence of ARIMA order parameters: [p, d, q] + * @param windowSize Size of the window for average metric value calculation (either a number of records or duration). + * @param windowOffset Optional window offset (either a number of records or duration) + * @param metadata List of metadata parameters specific to this metric + */ + final case class ArimaTrendMetricConfig( + id: ID, + description: Option[NonEmptyString], + lookupMetric: ID, + rule: TrendCheckRule, + order: ThreeElemIntSeq, + windowSize: NonEmptyString, + windowOffset: Option[NonEmptyString], + metadata: Seq[SparkParam] = Seq.empty + ) extends TrendMetricConfig { + val model: TrendMetricModel = order.value match { + case Seq(p, d, q) => new ARIMAModel(p, d, q) + } + val metricName: MetricName = MetricName.TrendArima + val paramString: Option[String] = order.value match { + case Seq(p, d, q) => Some(write(Map("p" -> p, "d" -> d, "q" -> q))) + } + } + /** Data Quality job configuration section describing regular metrics * * @param rowCount diff --git a/checkita-core/src/main/scala/org/checkita/dqf/config/validation/PostValidation.scala b/checkita-core/src/main/scala/org/checkita/dqf/config/validation/PostValidation.scala index 3f21710..c15f54c 100644 --- a/checkita-core/src/main/scala/org/checkita/dqf/config/validation/PostValidation.scala +++ b/checkita-core/src/main/scala/org/checkita/dqf/config/validation/PostValidation.scala @@ -591,6 +591,41 @@ object PostValidation { )) }.toVector + /** + * Ensure validity of ARIMA model order parameters in ARIMA trend metric configuration. + * + * @note This check is run at post-validation stage, as it is quite difficult to derive + * pureconfig readers for sealed trait families (thus approach is used to define kinded configurations) + * to run check during config parsing. + */ + val validateArimaTrendMetricOrder: ConfigObject => Vector[ConfigReaderFailure] = root => + Try(root.toConfig.getObjectList("jobConfig.metrics.trend").asScala) + .getOrElse(List.empty).zipWithIndex + .filter(c => c._1.toConfig.getString("kind").toLowerCase == "arima") + .flatMap { + case (tm, idx) => + val order = tm.toConfig.getStringList("order").asScala.map(_.toInt) + val validations = Seq( + ( + (p: Int, d: Int, q: Int) => Seq(p, d, q).exists(_ < 0), + "ARIMA model order parameters must be non-negative." + ), + ( + (p: Int, _: Int, q: Int) => p + q <= 0, + "Either AR (p) or MA (q) order of ARIMA model (or both) must be non-zero." + ) + ) + order match { + case Seq(p, d, q) => validations.filter(_._1(p, d, q)).map{ + case (_, msg) => ConvertFailure( + UserValidationFailed(msg), + None, + s"jobConfig.metrics.trend.$idx.${tm.get("id").renderWithOpts}" + ) + } + } + }.toVector + /** * Validation to check if DQ job configuration contains missing references * from load checks to sources @@ -753,6 +788,7 @@ object PostValidation { validateComposedMetrics, validateMetricCrossReferences, validateTrendMetricWindowSettings, + validateArimaTrendMetricOrder, validateLoadCheckRefs, validateLoadCheckSchemaRefs, validateSnapshotCheckRefs, diff --git a/checkita-core/src/main/scala/org/checkita/dqf/context/DQJob.scala b/checkita-core/src/main/scala/org/checkita/dqf/context/DQJob.scala index 71b0654..cce1aef 100644 --- a/checkita-core/src/main/scala/org/checkita/dqf/context/DQJob.scala +++ b/checkita-core/src/main/scala/org/checkita/dqf/context/DQJob.scala @@ -388,7 +388,7 @@ trait DQJob extends Logging { log.info(s"$stage Finalize trend metric results...") metResults.toSeq.flatMap(_._2).filter(_.resultType == ResultType.TrendMetric) .map { r => - val mConfig = metricsMap.get(r.metricId) + val mConfig = trendMetricsMap.get(r.metricId) val desc = mConfig.flatMap(_.description).map(_.value) val params = mConfig.flatMap(_.paramString) val metadata = mConfig.flatMap(_.metadataString) diff --git a/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/MetricName.scala b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/MetricName.scala index 3c519fe..173cfc8 100644 --- a/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/MetricName.scala +++ b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/MetricName.scala @@ -19,7 +19,9 @@ object MetricName extends Enum[MetricName] { case object TrendMedian extends MetricName("TREND_MEDIAN") case object TrendFirstQ extends MetricName("TREND_FIRST_QUARTILE") case object TrendThirdQ extends MetricName("TREND_THIRD_QUARTILE") - case object TrendQuantile extends MetricName("TREND_Quantile") + case object TrendQuantile extends MetricName("TREND_QUANTILE") + case object TrendLinReg extends MetricName("TREND_LINREG") + case object TrendArima extends MetricName("TREND_ARIMA") case object RowCount extends MetricName("ROW_COUNT") case object NullValues extends MetricName("NULL_VALUES") case object EmptyValues extends MetricName("EMPTY_VALUES") diff --git a/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/TrendMetric.scala b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/TrendMetric.scala index 47d3811..5bef452 100644 --- a/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/TrendMetric.scala +++ b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/TrendMetric.scala @@ -1,7 +1,9 @@ package org.checkita.dqf.core.metrics -import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics import org.checkita.dqf.config.Enums.TrendCheckRule +import org.checkita.dqf.core.metrics.trend.TrendMetricModel + +import java.sql.Timestamp /** * Base class for trend metrics. @@ -9,7 +11,7 @@ import org.checkita.dqf.config.Enums.TrendCheckRule * - metric ID * - metric name * - lookup metric ID: metric which results will be pulled from DQ storage - * - aggregation function to compute statistic over historical metric results + * - model used to predict metric value based on historical metric results * - rule to build time window: either record or duration * - size of the window to pull historical results * - offset from current date or record (depending on rule) @@ -18,7 +20,7 @@ trait TrendMetric { val metricId: String val metricName: MetricName val lookupMetricId: String - val aggFunc: DescriptiveStatistics => Double + val model: TrendMetricModel val rule: TrendCheckRule val wSize: String val wOffset: Option[String] diff --git a/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/ARIMAModel.scala b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/ARIMAModel.scala new file mode 100644 index 0000000..c8c12a9 --- /dev/null +++ b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/ARIMAModel.scala @@ -0,0 +1,350 @@ +package org.checkita.dqf.core.metrics.trend + +import org.apache.commons.math3.optim.nonlinear.scalar.gradient.NonLinearConjugateGradientOptimizer +import org.apache.commons.math3.optim.nonlinear.scalar.{GoalType, ObjectiveFunction, ObjectiveFunctionGradient} +import org.apache.commons.math3.optim.{InitialGuess, MaxEval, MaxIter, SimpleValueChecker} +import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression +import org.checkita.dqf.utils.Common.jsonFormats +import org.json4s.jackson.Serialization.write + +import java.sql.Timestamp +import scala.annotation.tailrec + +class ARIMAModel(p: Int, d: Int, q: Int) extends TrendMetricModel { + + /** + * Addition to max lag for ARMA model initial parameter estimation by Hannan-Rissanen algorithm. + */ + private val hannanRissanenMaxLagDelta = 3 + + /** + * Infer trend metric value using ARIMA model + * fitted over historical metric results + * + * @param data Historical metric results. + * @param ts UTC timestamp of current reference date. + * @return Trend metric value and optional additional result + * (may contain some information about model parameters) + */ + override def infer(data: Seq[(Timestamp, Double)], + ts: Timestamp): (Double, Option[String]) = { + + assert(Seq(p, d, q).forall(_ >= 0), "Negative parameters for ARIMA model are not allowed.") + assert(p + q > 0, "Either AR (p) or MA (q) parameter of ARIMA model or both must be non-zero.") + assert( + data.size > 2 * (math.max(p, q) + hannanRissanenMaxLagDelta) + d + 1, + "In order to fit ARIMA model, the size of historical data set must be greater than " + + s"'2 * (max(p, q) + $hannanRissanenMaxLagDelta) + d + 1' of ARIMA order defined in trend metric: " + + s"dataset contains ${data.size} metric results but minimum required size is " + + 2 * (math.max(p, q) + hannanRissanenMaxLagDelta) + d + 2 + ) + + val model = new ARIMAOptimizer(data.sortBy(_._1.getTime).map(_._2), this.p, this.d, this.q).fit + val modelInfo = Map( + "No. Observations" -> model.noObs, + "Log Likelihood" -> model.logLikelihood, + "AIC (approximate)" -> model.approxAIC, + "Sigma2" -> model.sigma, + "Coefficients" -> Map( + "Intercept" -> model.getIntercept, + "AR Coefficients" -> model.getARCoefs, + "MA Coefficients" -> model.getMACoefs + ) + ) + + model.forecast -> Some(write(modelInfo)) + } + + class ARIMAOptimizer(data: Seq[Double], p: Int, d: Int, q: Int) { + private lazy val (diffedData, firstObs) = makeDiff(data, d) + private lazy val diffedSiftedData: Seq[Seq[Double]] = shiftData(diffedData, p, keepOriginal = true) + + class ARIMA(val params: Seq[Double]) { + private lazy val intercept: Double = params.head + private lazy val arCoefs: Seq[Double] = params.slice(1, p + 1) + private lazy val maCoefs: Seq[Double] = params.takeRight(q) + + /** + * Forecasts next point. + */ + def forecast: Double = { + val forecastPoint = 0.0 +: getNextX(diffedData, p) + val dataForecast = diffedSiftedData.drop(math.max(q - p, 0)) :+ forecastPoint + val forecastY = generatePredictions(dataForecast).last + inverseDiff(diffedData :+ forecastY, d, firstObs).last + } + + def getIntercept: Double = intercept + def getARCoefs: Seq[Double] = arCoefs + def getMACoefs: Seq[Double] = maCoefs + + /** + * Computes Sigma2 + */ + def sigma: Double = getCSS / diffedData.size + + /** + * Computes log likelihood for current model parameters. + * @return Log likelihood of the model + */ + def logLikelihood: Double = { + val css = getCSS + val sigma = css / diffedData.size + + -0.5 * diffedData.size * math.log(2 * math.Pi * sigma) - css / (2 * sigma) + } + + /** + * Computes approximate AIC criterion for current model parameters + * @return Approximate AIC. + */ + def approxAIC: Double = -2 * this.logLikelihood + 2 * (p + q + 1) + + /** + * Gets number of observations (size of initial time-series data) + * @return Number of observations. + */ + def noObs: Int = data.size + + /** + * Yields predictions for train dataset. + * Mainly used to compute residuals. + * @return Fitted values for train dataset. + */ + def fitted: Seq[Double] = this.generatePredictions( + diffedSiftedData.drop(math.max(q - p, 0)) + ) + + private def getCSS: Double = diffedData.drop(math.max(p, q)) + .zip(this.fitted) + .map{ case (yTrue, yFit) => math.pow(yTrue - yFit, 2)} + .sum + + /** + * Yields prediction for provided data. + * Data is a matrix where first column is true values + * and the rest ones are shifted AR terms. + * + * @return Predictions for train data. + */ + private def generatePredictions(shiftedData: Seq[Seq[Double]]): Seq[Double] = { + val (fitted, _) = shiftedData + .foldLeft(Seq.empty[Double] -> Seq.fill[Double](q)(0)){ (acc, row) => + val (yTrue, arX) = row.head -> row.tail + val (yFit, maX) = acc + val arY = arX.zip(arCoefs).map(xc => xc._1 * xc._2).sum + val maY = maX.zip(maCoefs).map(xc => xc._1 * xc._2).sum + val y = intercept + arY + maY + val e = yTrue - y + (yFit :+ y, e +: maX.dropRight(1)) + } + fitted + } + } + + /** + * Fits ARIMA model to provided time-series data. + * @return Trained ARIMA model. + */ + def fit: ARIMA = { + if (p > 0 && q == 0) { + // ARIMA converges to simple AR model + val (params, _) = fitARShifted(diffedSiftedData) + new ARIMA(params) + } else { + val initialGuess = new InitialGuess(initializeParameters.toArray) + + val optimizer = new NonLinearConjugateGradientOptimizer( + NonLinearConjugateGradientOptimizer.Formula.FLETCHER_REEVES, + new SimpleValueChecker(1e-7, 1e-7) + ) + + val objFunction = new ObjectiveFunction( + (params: Array[Double]) => new ARIMA(params).logLikelihood + ) + + val gradientFunction = new ObjectiveFunctionGradient( + (params: Array[Double]) => computeLLGradient(params).toArray + ) + val optimalParams = optimizer.optimize( + objFunction, gradientFunction, GoalType.MAXIMIZE, initialGuess, new MaxIter(100000), new MaxEval(100000) + ) + new ARIMA(optimalParams.getPoint) + } + } + + /** + * Gets initial estimation of ARIMA model parameters using Hannan-Rissanen algorithm. + * + * @return Initial estimate of ARIMA model parameters. + */ + private def initializeParameters: Seq[Double] = { + val maxLag = math.max(p, q) + hannanRissanenMaxLagDelta + + // estimate errors from AR(maxLag) + val (_, errors) = fitAR(diffedData, maxLag) + + val shiftedErrors = shiftData(errors, q, keepOriginal = false).drop(math.max(p - q, 0)) + val shiftedData = shiftData(diffedData.drop(maxLag), p, keepOriginal = true).drop(math.max(q - p, 0)) + + assert( + shiftedData.size == shiftedErrors.size, + "In order to estimate initial ARIMA parameters error terms and data terms must have the same number of rows." + ) + + val (params, _) = fitARShifted( + shiftedData.zip(shiftedErrors).map { case (d, e) => d ++ e } + ) + + params + } + + /** + * Compute log likelihood gradient + * by applying small perturbations to each parameter of ARIMA model + * + * @return Log likelihood gradient vector. + */ + private def computeLLGradient(params: Seq[Double]): Seq[Double] = { + val delta = params.map(_.abs).min / 1e5 + + val genDeltaLLs = (d: Double) => (0 to p + q).map { idx => + params.zipWithIndex.map { + case (param, pos) => if (pos == idx) param + d else param + } + }.map(new ARIMA(_).logLikelihood) + + val plusDeltaLLs = genDeltaLLs(delta) + val minusDeltaLLs = genDeltaLLs(-delta) + + plusDeltaLLs.zip(minusDeltaLLs).map{ + case (p, m) => (p - m) / 2 / delta + } + } + + /** + * Trains auto-regression model provided with prepared data shifted to the desired lag indices. + * First column must contain Y-values. + * + * @param trainData data to fit regression model + * @return Regression model parameters and residuals. + */ + private def fitARShifted(trainData: Seq[Seq[Double]]): (Array[Double], Array[Double]) = { + val (arY, arX) = splitXY(trainData) + if (arY.nonEmpty && arX.nonEmpty) { + val linReg = new OLSMultipleLinearRegression() + linReg.newSampleData(arY, arX) + linReg.estimateRegressionParameters() -> linReg.estimateResiduals() + } else Array.empty[Double] -> Array.empty[Double] + } + + /** + * Trains auto-regression model provided with time-series data and lag indices to train for. + * + * @param trainData Time-series train data. + * @param maxLag Maximum lag index to include in AR terms. + * @return Regression model parameters and residuals + */ + private def fitAR(trainData: Seq[Double], maxLag: Int): (Array[Double], Array[Double]) = { + val shiftedData = shiftData(trainData, maxLag, keepOriginal = true) + fitARShifted(shiftedData) + } + + /** + * Make differencing of data required number of times (order). + * + * @param data Input time-series data + * @param order Number of differencing operations to perform. + * @return Diffed time-series data and first observations for each differencing order. + * @note First observations are essential to restore time-series data. + */ + @tailrec + private def makeDiff(data: Seq[Double], + order: Int, + firstObs: Seq[Double] = Seq.empty): (Seq[Double], Seq[Double]) = + if (order <= 0) data -> firstObs + else { + val diff = data.sliding(2).map(s => s.tail.head - s.head).toSeq + makeDiff(diff, order - 1, data.head +: firstObs) + } + + /** + * Restores original time-series data from diffed one. + * In order to restore time-series to its original state + * it is essential to provide first observations points. + * + * @param data Diffed time-series data. + * @param order Order of differencing to inverse + * @param firstObservations Sequence of first observations at each level of differencing. + * Order of observations in sequence must be reverted: + * observation from last differencing must come first, + * while first observation from original data should be at last position. + * @return + */ + @tailrec + private def inverseDiff(data: Seq[Double], + order: Int, + firstObservations: Seq[Double]): Seq[Double] = + if (order <= 0) data + else { + + assert( + firstObservations.size == order, + "In order to inverse time-series differencing it is required to provide " + + "first observations for each order of differencing. " + + s"Provided ${firstObservations.size} observation, but order of differencing is $order." + ) + + val (curObs, restObs) = firstObservations.head -> firstObservations.tail + val inverse = data.foldLeft(Vector(curObs))((v, d) => v :+ v.last + d) + inverseDiff(inverse, order - 1, restObs) + } + + /** + * Crates training data set for AR model by shifting time-series data to the specified lags. + * + * @param data Input time-series data. + * @param maxLag Maximum lag index to shift values for. + * @param keepOriginal Boolean flag indicating whether original data is kept in the output array. + * @return Array with data shifted to specified lags from 1 to maxLag. + */ + private def shiftData(data: Seq[Double], maxLag: Int, keepOriginal: Boolean): Seq[Seq[Double]] = { + if (maxLag == 0 && keepOriginal) data.map(Seq(_)) + else if (maxLag == 0) data.map(_ => Seq.empty[Double]) + else { + val allLags = if (keepOriginal) 0 to maxLag else 1 to maxLag + val window = maxLag + 1 + data + .sliding(window) + .map(_.toArray) + .map { group => + allLags.map(lag => group(window - 1 - lag)) + }.toSeq + } + } + + /** + * Retrieves X-vector from timeseries for next value forecasting by AR model. + * Collects values for lag indices from 1 to maxLag. + * + * @param data Historical metric results. + * @param maxLag Max lag index to retrieve values for. + * @return X-vector for new value prediction. + */ + def getNextX(data: Seq[Double], maxLag: Int): Seq[Double] = + if (maxLag == 0) Seq.empty[Double] + else (1 to maxLag).map(l => data(data.size - l)) + + /** + * Splits shifted data to Y vector and X-feature matrix. + * Sequence is converted to array for input to linear regression model. + * + * @param data Shifted data to fir linear regression. + * @return Y-vector and X-feature matrix. + */ + private def splitXY(data: Seq[Seq[Double]]): (Array[Double], Array[Array[Double]]) = { + if (data.forall(_.size <= 1)) Array.empty[Double] -> Array.empty[Array[Double]] + else data.map(_.head).toArray -> data.map(_.tail.toArray).toArray + } + } +} diff --git a/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/DescriptiveStatisticModel.scala b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/DescriptiveStatisticModel.scala new file mode 100644 index 0000000..e85a3cf --- /dev/null +++ b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/DescriptiveStatisticModel.scala @@ -0,0 +1,47 @@ +package org.checkita.dqf.core.metrics.trend + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics +import org.apache.commons.math3.stat.descriptive.rank.Percentile +import org.apache.commons.math3.stat.descriptive.rank.Percentile.EstimationType + +import java.sql.Timestamp + +/** + * Base class to compute statistics over historical metric results. + * @param f Function that is used to compute statistic value + */ +class DescriptiveStatisticModel(f: DescriptiveStatistics => Double) extends TrendMetricModel { + + /** + * Function which aggregates historical metric results and computes a desired statistic. + * + * @param data Historical metric results + * @return Statistic computed over historical metric results. + * + * @note Linear method is used to compute quantiles (method #7 in [1]). + * + * [1] R. J. Hyndman and Y. Fan, "Sample quantiles in statistical packages, + * "The American Statistician, 50(4), pp. 361-365, 1996 + */ + override def infer(data: Seq[(Timestamp, Double)], ts: Timestamp): (Double, Option[String]) = { + val stats = new DescriptiveStatistics + stats.setPercentileImpl(new Percentile().withEstimationType(EstimationType.R_7)) + data.foreach(d => stats.addValue(d._2)) + f(stats) -> None + } +} + +/** + * Supported statistics + */ +object DescriptiveStatisticModel { + object Avg extends DescriptiveStatisticModel(_.getMean) + object Std extends DescriptiveStatisticModel(_.getStandardDeviation) + object Min extends DescriptiveStatisticModel(_.getMin) + object Max extends DescriptiveStatisticModel(_.getMax) + object Sum extends DescriptiveStatisticModel(_.getSum) + object Median extends DescriptiveStatisticModel(_.getPercentile(50)) + object FirstQuantile extends DescriptiveStatisticModel(_.getPercentile(25)) + object ThirdQuantile extends DescriptiveStatisticModel(_.getPercentile(75)) + class Quantile(quantile: Double) extends DescriptiveStatisticModel(_.getPercentile(quantile)) +} diff --git a/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/LinearRegressionModel.scala b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/LinearRegressionModel.scala new file mode 100644 index 0000000..75f6c19 --- /dev/null +++ b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/LinearRegressionModel.scala @@ -0,0 +1,45 @@ +package org.checkita.dqf.core.metrics.trend +import org.apache.commons.math3.stat.regression.SimpleRegression +import org.checkita.dqf.utils.Common.jsonFormats +import org.json4s.jackson.Serialization.write + +import java.sql.Timestamp + +/** + * Simple linear regression model for trend metrics + */ +object LinearRegressionModel extends TrendMetricModel { + + /** + * Infer trend metric value using simple linear regression + * fitted over historical metric results + * + * @param data Historical metric results. + * @param ts UTC timestamp of current reference date. + * @return Trend metric value + * + * @note Timestamps are converted to epoch milliseconds. + */ + override def infer(data: Seq[(Timestamp, Double)], ts: Timestamp): (Double, Option[String]) = { + + assert( + data.size > 1, + "In order to fit linear regression model, the historical data set must contain at least 2 results, " + + s"but dataset contains only ${data.size} metric results." + ) + + val linReg = new SimpleRegression() + + data.foreach{ d => + linReg.addData(d._1.toInstant.toEpochMilli, d._2) + } + + val forecast = linReg.predict(ts.toInstant.toEpochMilli) + val info = Map( + "No. Observations" -> linReg.getN, + "MSE" -> linReg.getSumSquaredErrors / linReg.getN + ) + + forecast -> Some(write(info)) + } +} diff --git a/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/TrendMetricCalculator.scala b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/TrendMetricCalculator.scala index 3f510cf..360ff69 100644 --- a/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/TrendMetricCalculator.scala +++ b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/TrendMetricCalculator.scala @@ -1,8 +1,5 @@ package org.checkita.dqf.core.metrics.trend -import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics -import org.apache.commons.math3.stat.descriptive.rank.Percentile -import org.apache.commons.math3.stat.descriptive.rank.Percentile.EstimationType import org.checkita.dqf.appsettings.AppSettings import org.checkita.dqf.core.CalculatorStatus import org.checkita.dqf.core.Results.{MetricCalculatorResult, ResultType} @@ -11,27 +8,10 @@ import org.checkita.dqf.core.metrics.TrendMetric import org.checkita.dqf.storage.Managers.DqStorageManager import org.checkita.dqf.storage.Models._ +import java.sql.Timestamp import scala.util.{Failure, Success, Try} object TrendMetricCalculator { - - /** - * Function which aggregates historical metric results and computes a desired statistic. - * - * @param data Historical metric results - * @return Statistic computed over historical metric results. - * - * @note Linear method is used to compute quantiles (method #7 in [1]). - * - * [1] R. J. Hyndman and Y. Fan, "Sample quantiles in statistical packages, - * "The American Statistician, 50(4), pp. 361-365, 1996 - */ - private def aggregate(data: Seq[Double], f: DescriptiveStatistics => Double): Double = { - val stats = new DescriptiveStatistics - stats.setPercentileImpl(new Percentile().withEstimationType(EstimationType.R_7)) - data.foreach(stats.addValue) - f(stats) - } /** * Returns metric calculator result with error status and corresponding error message @@ -63,7 +43,7 @@ object TrendMetricCalculator { lookupMetResultType: ResultType) (implicit jobId: String, manager: Option[DqStorageManager], - settings: AppSettings): Seq[Double] = { + settings: AppSettings): Seq[(Timestamp, Double)] = { require( manager.nonEmpty, s"In order to perform trend metric calculation it is required to have a valid connection to results storage." @@ -85,7 +65,9 @@ object TrendMetricCalculator { ) } - val resultVals = historyResults.map(_.result).filterNot(_.isNaN) + val resultVals = historyResults + .filterNot(_.result.isNaN) + .map(r => (r.referenceDate, r.result)) if (resultVals.isEmpty) throw new IllegalArgumentException( s"Unable to perform calculation of trend metric '${tm.metricId}' due to historical results were not found " + @@ -120,13 +102,13 @@ object TrendMetricCalculator { manager: Option[DqStorageManager], settings: AppSettings): MetricCalculatorResult = Try { val historicalResults = loadHistoricalResults(tm, lookupMetResultType) - aggregate(historicalResults, tm.aggFunc) + tm.model.infer(historicalResults, settings.referenceDateTime.getUtcTS) } match { case Success(result) => MetricCalculatorResult( tm.metricId, tm.metricName.entryName, - result, - None, + result._1, + result._2, lookupMetSourceIds, Seq.empty, Seq.empty, diff --git a/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/TrendMetricModel.scala b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/TrendMetricModel.scala new file mode 100644 index 0000000..e30727c --- /dev/null +++ b/checkita-core/src/main/scala/org/checkita/dqf/core/metrics/trend/TrendMetricModel.scala @@ -0,0 +1,22 @@ +package org.checkita.dqf.core.metrics.trend + +import java.sql.Timestamp + +/** + * Trait that should be implemented by + * all trend metrics as it provides + * the fundamental method for inferring + * trend metric value. + */ +trait TrendMetricModel { + /** + * Infer trend metric value for current reference date + * based on historical metric results. + * @param data Historical metric results. + * @param ts UTC timestamp of current reference date. + * @return Trend metric value and optional additional result + * (may contain some information about model parameters) + */ + def infer(data: Seq[(Timestamp, Double)], ts: Timestamp): (Double, Option[String]) +} + diff --git a/checkita-core/src/main/scala/org/checkita/dqf/core/serialization/Implicits.scala b/checkita-core/src/main/scala/org/checkita/dqf/core/serialization/Implicits.scala index 46ad071..2a9de65 100644 --- a/checkita-core/src/main/scala/org/checkita/dqf/core/serialization/Implicits.scala +++ b/checkita-core/src/main/scala/org/checkita/dqf/core/serialization/Implicits.scala @@ -326,6 +326,10 @@ object Implicits extends SerDeTransformations getProductSerDe(ThirdQuartileTrendMetricConfig.unapply, ThirdQuartileTrendMetricConfig.tupled) case MetricName.TrendQuantile => getProductSerDe(QuantileTrendMetricConfig.unapply, QuantileTrendMetricConfig.tupled) + case MetricName.TrendLinReg => + getProductSerDe(LinregTrendMetricConfig.unapply, LinregTrendMetricConfig.tupled) + case MetricName.TrendArima => + getProductSerDe(ArimaTrendMetricConfig.unapply, ArimaTrendMetricConfig.tupled) } serDe.asInstanceOf[SerDe[RegularMetric]] } From 580520bcc5452de0f767cb8758894a8615d2a5a4 Mon Sep 17 00:00:00 2001 From: Yury DYLKO Date: Mon, 11 Nov 2024 17:43:57 +0300 Subject: [PATCH 2/3] fix: collection conversion fix for scala 2.13 --- .../org/checkita/dqf/config/validation/PostValidation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/checkita-core/src/main/scala/org/checkita/dqf/config/validation/PostValidation.scala b/checkita-core/src/main/scala/org/checkita/dqf/config/validation/PostValidation.scala index c15f54c..c8f2dd2 100644 --- a/checkita-core/src/main/scala/org/checkita/dqf/config/validation/PostValidation.scala +++ b/checkita-core/src/main/scala/org/checkita/dqf/config/validation/PostValidation.scala @@ -604,7 +604,7 @@ object PostValidation { .filter(c => c._1.toConfig.getString("kind").toLowerCase == "arima") .flatMap { case (tm, idx) => - val order = tm.toConfig.getStringList("order").asScala.map(_.toInt) + val order = tm.toConfig.getStringList("order").asScala.toSeq.map(_.toInt) val validations = Seq( ( (p: Int, d: Int, q: Int) => Seq(p, d, q).exists(_ < 0), From 1e69ac89ef0a789e0b3ea702517a660c202a7985 Mon Sep 17 00:00:00 2001 From: Yury DYLKO Date: Mon, 11 Nov 2024 18:07:56 +0300 Subject: [PATCH 3/3] fix: docs update --- docs/03-job-configuration/08-Metrics.md | 20 +++++++- docs/ru/index.md | 64 ------------------------- 2 files changed, 18 insertions(+), 66 deletions(-) delete mode 100644 docs/ru/index.md diff --git a/docs/03-job-configuration/08-Metrics.md b/docs/03-job-configuration/08-Metrics.md index c663833..f67e687 100644 --- a/docs/03-job-configuration/08-Metrics.md +++ b/docs/03-job-configuration/08-Metrics.md @@ -801,10 +801,26 @@ Thus, trend metrics are defined `trend` subsection using following set of parame * `id` - *Required*. Trend metric ID; * `description` - *Optional*. Trend metric description. -* `kind` - *Required*. Kind of statistic to be calculated over historical metric results. - * Available trend metric kinds are: `avg`, `std`, `min`, `max`, `sum`, `median`, `firstQuartile`, `thirdQuartile`, `quantile`. +* `kind` - *Required*. Kind of statistic to be calculated over historical metric results. + Available trend metric kinds are: + * `avg` - average of the collected results, + * `std` - standard deviation of the collected results, + * `min` - minimum value from the collected results, + * `max` - maximum value from the collected results, + * `sum` - sum of the collected results, + * `median` - median value of the collected results, + * `firstQuartile` - first quartile of the collected results, + * `thirdQuartile` - third quartile of the collected results, + * `quantile` - arbitrary quantile of the collected results. It is required to set quantile parameter as shown below. + * `linreg` - predicts metric result based on linear regression model trained on collected results. + `referenceDate` is transformed to epoch time and used to as X-values. Thus, metric result is + predicted for current value of `referenceData`. + * `arima` - predicts metric result based on ARIMA model trained on collected results. It is required + to provide ARIMA model order parameters: auto-regression (p) order, integration (d) order and + moving average (q) order as shown below. It is up to user to chose optimal ARIMA model parameters. * `quantile` - *Required*. **ONLY FOR `quantile` TREND METRIC**. Quantile to compute over historical metric results (must be a number in range `[0, 1]`). +* `order` - *Required*. **ONLY FOR `arima` TREND METRIC**. List of ARIMA model order parameters (AR, I, MA): `[p, d, q]`. * `lookupMetric` - *Required*. Lookup metric ID: metric which results will be pulled from DQ storage. * `rule` - *Required*. The rule for loading historical metric results from DQ storage. There are two rules supported: * `record` - loads specified number of historical metric result records. diff --git a/docs/ru/index.md b/docs/ru/index.md deleted file mode 100644 index 8002431..0000000 --- a/docs/ru/index.md +++ /dev/null @@ -1,64 +0,0 @@ -# Home - -**Актуальная версия: 1.7.2** - -> Документация на русском языке находится в стадии разработки. Пожалуйста, пользуйтесь документацией на английском. - -Для обеспечения качества больших данных, необходимо выполнять расчеты большого количества метрик и проверок -над огромными датасетами, что в свою очередь является сложной задачей. - -Checkita - это Data Quality фреймворк, который решает эту задачу, позволяя формализовать и упростить процесс -подключения и чтения данных из различных источников, описания метрик и проверок над данным в этих источниках, -а также отправку результатов и уведомлений по различным каналам. - -Итак, **Checkita** позволяет выполнять расчет различных метрик и проверок над данными (как структурированными, -так и неструктурированными). Фреймворк способен выполнять распределенные вычисления над данными за "один проход", -используя Spark в качестве вычислительного ядра. Конфигурационные Hocon файлы используются как для описания -настроек приложения и, так и для описания пайплайна вычисления метрик и проверок. Результаты расчетов сохраняются в -выделенную базу фреймворка, а также могут быть отправлены пользователям по различным каналам: -файл (локальная FS, HDFS, S3), Email, Mattermost, Kafka. - -Использование Spark в качестве вычислительного ядра позволяет выполнять расчеты метрик и проверок -на уровне "сырых" данных, не требуя каких-либо SQL абстракций над данными (таких, как Hive или Impala), -которые в свою очередь могут скрывать некоторые ошибки в данных -(например, плохое форматирования или несоответствия схемы). - -Checkita позволяет выполнять следующее: - -* Читать данные из различных источников (HDFS, S3, Hive, Jdbc, Kafka) и в различных форматах (text, orc, parquet, avro). -* Применять SQL запросы к данным, таким образом формируя производные "виртуальные источники" данных. - Данный функционал осуществляется посредством Spark DataFrame API. -* Выполнять расчет широкого спектра метрик над данными, а также выполнять композицию метрик. -* Проводить проверки над данными на основе рассчитанных метрик. -* Выполнять проверки основанные на предыдущих результатах расчета (обнаружение аномалий в данных). -* Сохранять результаты расчета в базу данных фреймворка, а отправлять их по другим каналам - (HDFS, S3, Hive, Kafka, Email, Mattermost). - -Checkita разрабатывается с фокусом на интеграцию в ETL пайплайны и системы каталогов данных: - -* Фреймворк использует Spark и может быть запущен как обычное Spark-приложение. Spark, в свою очередь, - является наиболее широко распространенным решением для распределенной обработки больших данных. -* No-code конфигурация пайплайнов посредством Hocon файлов, которые могут быть легко составлены и версионированы в VSC. -* Выделенная база данных для хранения результатов может быть использована для последующего представления - этих результатов посредством деш-бордов или простых UI. -* Встроенная поддержка нотификаций (Email, Mattermost) позволяет информировать пользователей - о проблемах с качеством данных. -* Альтернативные каналы отправки результатов, такие как Kafka, могут быть использованы для интеграции с другими сервисами. - -Еще одной ключевой особенностью фреймворка Checkita является возможность обрабатывать как статичные (batch), так и -потоковые источники данных. Так, поддерживается запуск двух типов приложений: для пакетной и потоковой проверки -источников данных. ***Потоковый режим работы фреймворка в данных момент находится в экспериментальной стадии, -и поэтому в этой части работы фреймворка возможны изменения.*** - -Фреймворк написан на Scala 2.12 и использует Spark 2.4+ в качестве вычислительного ядра. -В проекте настроена параметризуемая сборка, которая позволяет собирать фреймворк под определенную версию Spark, -публиковать проект в заданный репозиторий, а также собирать Uber-jar, как с зависимостями Spark, так и без них. - -**Лицензия** - -Фреймворк Checkita распространяется под лицензией [GNU LGPL](../LICENSE.txt). - ---- - -Данный проект - это переосмысление [Data Quality фреймворка](https://github.com/agile-lab-dev/DataQuality) -разработанного компанией Agile Lab, Италия.