Skip to content

Commit

Permalink
feat: new db table to store metric errors (#31)
Browse files Browse the repository at this point in the history
- added migration scripts to create new table for storing metric errors.
- added new enabler in storage configuration to control whether metric
errors need to be saved in DQ Storage.
- added new enabler in encryption configuration to control whether data
excerpts in metric errors should be encrypted.
- modified class storing metric errors and created its Slick table
representation. Added error hash field that is used as part of unique
key to deduplicate error records. Error hash is and MD5 hash string that
is always computed using raw data excerpts even if it is encrypted
later.
- added deduplication of finalized metric errors by their unique
constraint in order to avoid sending repeating data.
- modified encryptor to make it faster (do not re-generate AES key and
initial IV vector for every encryption attempt).
- fixed partition values filter in HiveSourceReader
- added more verbose logs for case when some of the source keyFields are
not found within dataframe columns.
- enhances JDBC manager upsert method to avoid stack overflow errors
when saving large sequences of results (quite usual case for storing
metric errors).
- added Source validation on read: checks that all keyFields are
presented in datafarame.
- refactored application settings logging.
- added implicit class to convert Option instances to Result instances.
- documentation is updated to reflect the changes.
  • Loading branch information
gabb1er authored Mar 13, 2024
1 parent 5bd15e9 commit 37db1ed
Show file tree
Hide file tree
Showing 32 changed files with 466 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE "${defaultSchema}"."results_metric_error"
(
"job_id" VARCHAR(512) NOT NULL,
"metric_id" VARCHAR(512) NOT NULL,
"source_id" VARCHAR(512),
"source_key_fields" VARCHAR(2048),
"metric_columns" VARCHAR(2048),
"status" VARCHAR(512) NOT NULL,
"message" TEXT NOT NULL,
"row_data" TEXT NOT NULL,
"error_hash" VARCHAR(512) NOT NULL,
"reference_date" TIMESTAMP NOT NULL,
"execution_date" TIMESTAMP NOT NULL,
UNIQUE ("job_id", "error_hash", "reference_date")
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE "${defaultSchema}"."results_metric_error"
(
"job_id" VARCHAR(512) NOT NULL,
"metric_id" VARCHAR(512) NOT NULL,
"source_id" VARCHAR(512),
"source_key_fields" VARCHAR(2048),
"metric_columns" VARCHAR(2048),
"status" VARCHAR(512) NOT NULL,
"message" VARCHAR(MAX) NOT NULL,
"row_data" VARCHAR(MAX) NOT NULL,
"error_hash" VARCHAR(512) NOT NULL,
"reference_date" DATETIME NOT NULL,
"execution_date" DATETIME NOT NULL,
UNIQUE ("job_id", "error_hash", "reference_date")
);
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CREATE TABLE "${defaultSchema}"."job_state"
(
"job_id" VARCHAR(256) NOT NULL,
"config" TEXT NOT NULL,
"version_info" VARCHAR(512 NOT NULL,
"version_info" VARCHAR(512) NOT NULL,
"reference_date" TIMESTAMP NOT NULL,
"execution_date" TIMESTAMP NOT NULL,
UNIQUE ("job_id", "reference_date")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE "${defaultSchema}"."results_metric_error"
(
"job_id" VARCHAR(256) NOT NULL,
"metric_id" VARCHAR(256) NOT NULL,
"source_id" VARCHAR(512),
"source_key_fields" VARCHAR(2048),
"metric_columns" VARCHAR(2048),
"status" VARCHAR(512) NOT NULL,
"message" TEXT NOT NULL,
"row_data" TEXT NOT NULL,
"error_hash" VARCHAR(256) NOT NULL,
"reference_date" TIMESTAMP NOT NULL,
"execution_date" TIMESTAMP NOT NULL,
UNIQUE ("job_id", "error_hash", "reference_date")
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE "${defaultSchema}"."results_metric_error"
(
"job_id" VARCHAR(512) NOT NULL,
"metric_id" VARCHAR(512) NOT NULL,
"source_id" VARCHAR(512),
"source_key_fields" VARCHAR(2048),
"metric_columns" VARCHAR(2048),
"status" VARCHAR(512) NOT NULL,
"message" CLOB NOT NULL,
"row_data" CLOB NOT NULL,
"error_hash" VARCHAR(512) NOT NULL,
"reference_date" TIMESTAMP NOT NULL,
"execution_date" TIMESTAMP NOT NULL,
UNIQUE ("job_id", "error_hash", "reference_date")
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE "${defaultSchema}"."results_metric_error"
(
"job_id" TEXT NOT NULL,
"metric_id" TEXT NOT NULL,
"source_id" TEXT,
"source_key_fields" TEXT,
"metric_columns" TEXT,
"status" TEXT NOT NULL,
"message" TEXT NOT NULL,
"row_data" TEXT NOT NULL,
"error_hash" TEXT NOT NULL,
"reference_date" TIMESTAMP NOT NULL,
"execution_date" TIMESTAMP NOT NULL,
UNIQUE ("job_id", "error_hash", "reference_date")
);
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ CREATE TABLE "job_state"
"config" TEXT NOT NULL,
"reference_date" TIMESTAMP NOT NULL,
"execution_date" TIMESTAMP NOT NULL,
UNIQUE ("job_id", "reference_date")
UNIQUE ("job_id", "reference_date") ON CONFLICT REPLACE
);
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ CREATE TABLE "job_state"
"version_info" TEXT NOT NULL,
"reference_date" TIMESTAMP NOT NULL,
"execution_date" TIMESTAMP NOT NULL,
UNIQUE ("job_id", "reference_date")
UNIQUE ("job_id", "reference_date") ON CONFLICT REPLACE
);
INSERT INTO "job_state" (
"job_id",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE "results_metric_error"
(
"job_id" TEXT NOT NULL,
"metric_id" TEXT NOT NULL,
"source_id" TEXT,
"source_key_fields" TEXT,
"metric_columns" TEXT,
"status" TEXT NOT NULL,
"message" TEXT NOT NULL,
"row_data" TEXT NOT NULL,
"error_hash" TEXT NOT NULL,
"reference_date" TIMESTAMP NOT NULL,
"execution_date" TIMESTAMP NOT NULL,
UNIQUE ("job_id", "error_hash", "reference_date") ON CONFLICT REPLACE
);
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.Try
* @param aggregatedKafkaOutput Enables sending aggregates messages for Kafka Targets
* (one per each target type, except checkAlerts where
* one message per checkAlert will be sent)
* @param enableCaseSensitivity Enable columns case sensitivity
* @param enableCaseSensitivity Enables columns case sensitivity
* @param errorDumpSize Maximum number of errors to be collected per single metric.
* @param outputRepartition Sets the number of partitions when writing outputs. By default writes single file.
* @param storageConfig Configuration of connection to Data Quality Storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class ConfigEncryptor(secret: EncryptionKey, keyFields: Seq[String] = Seq("passw
private val ivLength = 16
private val saltTail = "Checkita"

private val aesSecretKey = getSecretKeySpec(secret)
private val initialIv = generateIV

/**
* Function that checks whether field value should be encrypted based on field name
*/
Expand Down Expand Up @@ -82,48 +85,42 @@ class ConfigEncryptor(secret: EncryptionKey, keyFields: Seq[String] = Seq("passw
* Encrypts string value using AES256 algorithm. Uses user-defined secret key for encryption.
*
* @param value String value to encrypt
* @param key User-defined secret key
* @return Either encrypted string or a list of encryption errors.
*/
private def encrypt(value: String, key: EncryptionKey): String = {
val aesKey = getSecretKeySpec(key)
val (iv, ivSpec) = generateIV

def encrypt(value: String): String = {
val c = Cipher.getInstance(cipher)
c.init(Cipher.ENCRYPT_MODE, aesKey, ivSpec)
c.init(Cipher.ENCRYPT_MODE, aesSecretKey, initialIv._2)
val cipherBytes = c.doFinal(value.getBytes(StandardCharsets.UTF_8))
val encrypted = new Array[Byte](ivLength + cipherBytes.length)

// Prepend IV to cipher output:
Array.copy(iv, 0, encrypted, 0, ivLength)
Array.copy(initialIv._1, 0, encrypted, 0, ivLength)
Array.copy(cipherBytes, 0, encrypted, ivLength, cipherBytes.length)

// return base64-encoded string from encrypted byte array:
Base64.getEncoder.encodeToString(encrypted)
} // .toResult(preMsg = s"Unable encrypt string due to following error:")
}

/**
* Decrypts string value using AES256 algorithm. Uses user-defined secret key for decryption.
*
* @param value String value to decrypt
* @param key User-defined secret key
* @return Either decrypted string or a list of decryption errors.
*/
private def decrypt(value: String, key: EncryptionKey): String = {
def decrypt(value: String): String = {
val encrypted = Base64.getDecoder.decode(value)
val aesKey = getSecretKeySpec(key)
val (_, ivSpec) = extractIV(encrypted)

val c = Cipher.getInstance(cipher)
c.init(Cipher.DECRYPT_MODE, aesKey, ivSpec)
c.init(Cipher.DECRYPT_MODE, aesSecretKey, ivSpec)

// extract only portion of array related to encrypted string (without IV bytes)
val cipherBytes = new Array[Byte](encrypted.length - ivLength)
Array.copy(encrypted, ivLength, cipherBytes, 0, cipherBytes.length)

// return decrypted data as a string:
new String(c.doFinal(cipherBytes), StandardCharsets.UTF_8)
} // .toResult(preMsg = "Unable to decrypt string due to following error:")
}

/**
* Recursive function used to traverse Typesafe configuration and substitute values if necessary.
Expand Down Expand Up @@ -169,7 +166,7 @@ class ConfigEncryptor(secret: EncryptionKey, keyFields: Seq[String] = Seq("passw
* @return New configuration instance with sensitive fields being encrypted.
*/
def encryptConfig(config: Config): Result[Config] = Try(
substituteConfigValues(shouldEncrypt, (value: String) => encrypt(value, secret))(config)
substituteConfigValues(shouldEncrypt, encrypt)(config)
).toResult(preMsg = s"Unable encrypt configuration due to following error:")

/**
Expand All @@ -179,7 +176,7 @@ class ConfigEncryptor(secret: EncryptionKey, keyFields: Seq[String] = Seq("passw
* @return New configuration instance with sensitive fields being decrypted.
*/
def decryptConfig(config: Config): Result[Config] = Try(
substituteConfigValues(shouldEncrypt, (value: String) => decrypt(value, secret))(config)
substituteConfigValues(shouldEncrypt, decrypt)(config)
).toResult(preMsg = s"Unable decrypt configuration due to following error:")
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import eu.timepit.refined.auto._
/**
* Application-level configuration for switchers (enablers)
*
* @param allowSqlQueries Enables arbitrary SQL queries in virtual sources
* @param allowNotifications Enables notifications to be sent from DQ application
* @param allowSqlQueries Enables arbitrary SQL queries in virtual sources
* @param allowNotifications Enables notifications to be sent from DQ application
* @param aggregatedKafkaOutput Enables sending aggregates messages for Kafka Targets
* (one per each target type, except checkAlerts where
* one message per checkAlert will be sent)
* @param enableCaseSensitivity Enable columns case sensitivity
* @param errorDumpSize Maximum number of errors to be collected per single metric per partition.
* @param outputRepartition Sets the number of partitions when writing outputs. By default writes single file.
* @param errorDumpSize Maximum number of errors to be collected per single metric per partition.
* @param outputRepartition Sets the number of partitions when writing outputs. By default writes single file.
*/
final case class Enablers(
allowSqlQueries: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import ru.raiffeisen.checkita.config.RefinedTypes.EncryptionKey
/**
* Application-level configuration describing encryption sensitive fields
*
* @param secret Secret string used to encrypt/decrypt sensitive fields
* @param keyFields List of key fields used to identify fields that requires encryption/decryption.
* @param secret Secret string used to encrypt/decrypt sensitive fields
* @param keyFields List of key fields used to identify fields that requires encryption/decryption.
* @param encryptErrorData Boolean flag indicating whether rowData (contains excerpts from data sources)
* field in metric errors should be encrypted.
*/
final case class Encryption(
secret: EncryptionKey,
keyFields: Seq[String] = Seq("password", "secret")
keyFields: Seq[String] = Seq("password", "secret"),
encryptErrorData: Boolean = false
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ import ru.raiffeisen.checkita.config.Enums.DQStorageType

/**
* Application-level configuration describing connection to history database.
* @param dbType Type of database used to store DQ data (one of the supported RDBMS)
* @param url Connection URL (without protocol identifiers)
* @param username Username to connect to database with (if required)
* @param password Password to connect to database with (if required)
* @param schema Schema where data quality tables are located (if required)
*
* @param dbType Type of database used to store DQ data (one of the supported RDBMS)
* @param url Connection URL (without protocol identifiers)
* @param username Username to connect to database with (if required)
* @param password Password to connect to database with (if required)
* @param schema Schema where data quality tables are located (if required)
* @param saveErrorsToStorage Enables metric errors to be stored in storage database.
* Be careful when storing metric errors in storage database
* as this might overload the storage.
*/
final case class StorageConfig(
dbType: DQStorageType,
url: URI,
username: Option[NonEmptyString],
password: Option[NonEmptyString],
schema: Option[NonEmptyString]
schema: Option[NonEmptyString],
saveErrorsToStorage: Boolean = false
)

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.logging.log4j.Level
import org.apache.spark.sql.SparkSession
import ru.raiffeisen.checkita.appsettings.{AppSettings, VersionInfo}
import ru.raiffeisen.checkita.config.ConfigEncryptor
import ru.raiffeisen.checkita.config.IO.readJobConfig
import ru.raiffeisen.checkita.config.Parsers._
import ru.raiffeisen.checkita.config.appconf.AppConfig
Expand Down Expand Up @@ -102,6 +103,7 @@ class DQContext(settings: AppSettings, spark: SparkSession, fs: FileSystem) exte
case Some(conf) => Seq(
"* Storage configuration:",
s" - Run DB migration: ${settings.doMigration}",
s" - Save Errors to DB: ${conf.saveErrorsToStorage}",
s" - Data base type: ${conf.dbType.toString}",
s" - Connection url: ${conf.url.value}",
) ++ conf.username.map(
Expand Down Expand Up @@ -147,6 +149,16 @@ class DQContext(settings: AppSettings, spark: SparkSession, fs: FileSystem) exte
"* Extra variables:",
s" - List of all available variables (values are not shown intentionally): $extraVarsList"
)

val logEncryption = Seq(
"* Encryption configuration:"
) ++ settings.encryption.map { eCfg =>
val kf = eCfg.keyFields.mkString("[", ", ", "]")
Seq(
s" - Encrypt configuration keys that contain following substrings: $kf",
s" - Encrypt metric errors row data: ${eCfg.encryptErrorData}",
)
}.getOrElse(Seq(" - Encryption configuration is not set and, therefore, results will not be encrypted."))

(
logGeneral ++
Expand All @@ -157,7 +169,8 @@ class DQContext(settings: AppSettings, spark: SparkSession, fs: FileSystem) exte
logStorageConf ++
logEmailConfig ++
logMMConfig ++
logExtraVars
logExtraVars ++
logEncryption
).foreach(msg => log.info(s"$settingsStage $msg"))
}

Expand Down
Loading

0 comments on commit 37db1ed

Please sign in to comment.