From 0980175034daaaa3cdf82e52d51802069233502b Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 08:30:46 +0800 Subject: [PATCH 01/17] Add in generateFirst flag to false in insta-integration --- insta-integration.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/insta-integration.yaml b/insta-integration.yaml index dbf6b12..f247911 100644 --- a/insta-integration.yaml +++ b/insta-integration.yaml @@ -6,6 +6,7 @@ run: env: PLAN_FILE_PATH: app/src/test/resources/sample/plan/account-balance-transaction-plan.yaml TASK_FOLDER_PATH: app/src/test/resources/sample/task + generateFirst: false test: validation: postgres: From a13cd4517a357fafb2533c2f04248f8a22adf26b Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 08:33:51 +0800 Subject: [PATCH 02/17] Use gradle cache in github action --- .github/workflows/check.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index ed9880a..d34e513 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -10,8 +10,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Run build - run: "./gradlew clean :app:shadowJar" + - name: Gradle build with cache + uses: burrunan/gradle-cache-action@v1 + with: + arguments: "clean :app:shadowJar" - name: Run integration tests id: tests uses: data-catering/insta-integration@v1 From daa7ddd60a3d8d9ed59992fb0d7287b8e79c3b21 Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 09:28:51 +0800 Subject: [PATCH 03/17] Add in missing account_status from balances test table --- app/src/test/resources/sample/sql/postgres/customer.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/app/src/test/resources/sample/sql/postgres/customer.sql b/app/src/test/resources/sample/sql/postgres/customer.sql index 2830421..e85bab9 100644 --- a/app/src/test/resources/sample/sql/postgres/customer.sql +++ b/app/src/test/resources/sample/sql/postgres/customer.sql @@ -25,6 +25,7 @@ CREATE TABLE IF NOT EXISTS account.balances ( account_number VARCHAR(20) UNIQUE NOT NULL, create_time TIMESTAMP, + account_status VARCHAR(10), balance DOUBLE PRECISION, PRIMARY KEY (account_number, create_time) ); From 169831d8d936b4b294a1550262d74dbab8f0b2f6 Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 09:42:34 +0800 Subject: [PATCH 04/17] Use debug log level for insta-integration --- insta-integration.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/insta-integration.yaml b/insta-integration.yaml index f247911..6054cb4 100644 --- a/insta-integration.yaml +++ b/insta-integration.yaml @@ -6,6 +6,7 @@ run: env: PLAN_FILE_PATH: app/src/test/resources/sample/plan/account-balance-transaction-plan.yaml TASK_FOLDER_PATH: app/src/test/resources/sample/task + LOG_LEVEL: debug generateFirst: false test: validation: From a35b2b84721cf8b6fa903f360cb112706cc5d298 Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 09:46:34 +0800 Subject: [PATCH 05/17] Try set transactions acocunt_number to be unique --- .../task/postgres/postgres-balance-transaction-task.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml b/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml index c4cd1ed..7243efe 100644 --- a/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml +++ b/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml @@ -39,6 +39,9 @@ steps: schema: fields: - name: "account_number" + generator: + options: + isUnique: true - name: "create_time" type: "timestamp" - name: "transaction_id" From ce4e7fccacd9dab86e0dfd4219d080ef475078bc Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 10:28:22 +0800 Subject: [PATCH 06/17] Take out isUnique from account.balances test, check if random amount of records is defined before trying to generate more records --- .../core/generator/BatchDataProcessor.scala | 19 +++++++++++-------- .../core/util/UniqueFieldsUtil.scala | 6 ++++-- .../postgres-balance-transaction-task.yaml | 4 ---- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala index f2f1e7c..85d14ca 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala @@ -68,14 +68,17 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String s"new-num-records=${additionalDf.count()}, actual-num-records=$dfRecordCount") } - while (targetNumRecords != dfRecordCount && retries < maxRetries) { - retries += 1 - generateAdditionalRecords() - } - if (targetNumRecords != dfRecordCount && retries == maxRetries) { - LOGGER.warn("Unable to reach expected number of records due to reaching max retries. " + - s"Can be due to limited number of potential unique records, " + - s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}") + //if random amount of records, don't try to regenerate more records + if (s.count.generator.isEmpty && s.count.perColumn.exists(_.generator.isEmpty)) { + while (targetNumRecords != dfRecordCount && retries < maxRetries) { + retries += 1 + generateAdditionalRecords() + } + if (targetNumRecords != dfRecordCount && retries == maxRetries) { + LOGGER.warn("Unable to reach expected number of records due to reaching max retries. " + + s"Can be due to limited number of potential unique records, " + + s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}") + } } trackRecordsPerStep = trackRecordsPerStep ++ Map(recordStepName -> stepRecords.copy(currentNumRecords = dfRecordCount)) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala index 32be44d..35e2202 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala @@ -100,8 +100,10 @@ class UniqueFieldsUtil(plan: Plan, executableTasks: List[(TaskSummary, Task)])(i val uniqueKeys = step.gatherUniqueFields val uniqueKeyUf = if (uniqueKeys.nonEmpty) uniqueKeys.map(u => UniqueFields(t._1.dataSourceName, step.name, List(u))) else List() val allKeys = primaryKeyUf ++ uniqueKeyUf - LOGGER.debug(s"Found unique fields that require unique values, " + - s"data-source-name=${t._1.dataSourceName}, step-name=${step.name}, columns=${allKeys.map(_.columns).mkString(",")}") + if (allKeys.nonEmpty) { + LOGGER.debug(s"Found unique fields that require unique values, " + + s"data-source-name=${t._1.dataSourceName}, step-name=${step.name}, columns=${allKeys.map(_.columns).mkString(",")}") + } allKeys }) }) diff --git a/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml b/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml index 7243efe..1697365 100644 --- a/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml +++ b/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml @@ -13,7 +13,6 @@ steps: type: "regex" options: regex: "ACC1[0-9]{5,10}" - isUnique: true - name: "create_time" type: "timestamp" - name: "account_status" @@ -39,9 +38,6 @@ steps: schema: fields: - name: "account_number" - generator: - options: - isUnique: true - name: "create_time" type: "timestamp" - name: "transaction_id" From 42ed765157c3b4a41f436b062a5d33e9852243fe Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 11:23:34 +0800 Subject: [PATCH 07/17] Fix if statement for checking random amount of records --- .../datacaterer/core/generator/BatchDataProcessor.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala index 85d14ca..97a32ca 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala @@ -69,7 +69,7 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String } //if random amount of records, don't try to regenerate more records - if (s.count.generator.isEmpty && s.count.perColumn.exists(_.generator.isEmpty)) { + if (s.count.generator.isEmpty && s.count.perColumn.forall(_.generator.isEmpty)) { while (targetNumRecords != dfRecordCount && retries < maxRetries) { retries += 1 generateAdditionalRecords() @@ -79,6 +79,8 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String s"Can be due to limited number of potential unique records, " + s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}") } + } else { + LOGGER.debug("Random amount of records generated, not attempting to generate more records") } trackRecordsPerStep = trackRecordsPerStep ++ Map(recordStepName -> stepRecords.copy(currentNumRecords = dfRecordCount)) From a54c144f0b88e574ab284f15fc840be00bec7529 Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 11:32:59 +0800 Subject: [PATCH 08/17] Remove unique from account_number in account.transactions table def --- app/src/test/resources/sample/sql/postgres/customer.sql | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/app/src/test/resources/sample/sql/postgres/customer.sql b/app/src/test/resources/sample/sql/postgres/customer.sql index e85bab9..d5bffb5 100644 --- a/app/src/test/resources/sample/sql/postgres/customer.sql +++ b/app/src/test/resources/sample/sql/postgres/customer.sql @@ -32,12 +32,11 @@ CREATE TABLE IF NOT EXISTS account.balances CREATE TABLE IF NOT EXISTS account.transactions ( - account_number VARCHAR(20) UNIQUE NOT NULL, + account_number VARCHAR(20) NOT NULL REFERENCES account.balances (account_number), create_time TIMESTAMP, transaction_id VARCHAR(20), amount DOUBLE PRECISION, - PRIMARY KEY (account_number, create_time, transaction_id), - CONSTRAINT fk_txn_account_number FOREIGN KEY (account_number) REFERENCES account.balances (account_number) + PRIMARY KEY (account_number, create_time, transaction_id) ); CREATE TABLE IF NOT EXISTS account.mapping From 791fbdb3200a156203eaf44e9843a61eec7013d2 Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 11:37:26 +0800 Subject: [PATCH 09/17] Add in APPLICATION_CONFIG_PATH to env for insta-integration --- insta-integration.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/insta-integration.yaml b/insta-integration.yaml index 6054cb4..286a2da 100644 --- a/insta-integration.yaml +++ b/insta-integration.yaml @@ -6,7 +6,7 @@ run: env: PLAN_FILE_PATH: app/src/test/resources/sample/plan/account-balance-transaction-plan.yaml TASK_FOLDER_PATH: app/src/test/resources/sample/task - LOG_LEVEL: debug + APPLICATION_CONFIG_PATH: app/src/main/resources/application.conf generateFirst: false test: validation: From c26be6a730b31c9660817adc38352b3bdf855ea1 Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 11:54:20 +0800 Subject: [PATCH 10/17] Update application.conf files --- app/src/main/resources/application.conf | 139 +++++++++++++----------- app/src/test/resources/application.conf | 13 ++- 2 files changed, 85 insertions(+), 67 deletions(-) diff --git a/app/src/main/resources/application.conf b/app/src/main/resources/application.conf index 14898ef..87503ca 100644 --- a/app/src/main/resources/application.conf +++ b/app/src/main/resources/application.conf @@ -1,97 +1,129 @@ flags { - enableGeneratePlanAndTasks = false - enableGeneratePlanAndTasks = ${?ENABLE_GENERATE_PLAN_AND_TASKS} enableCount = true enableCount = ${?ENABLE_COUNT} enableGenerateData = true enableGenerateData = ${?ENABLE_GENERATE_DATA} + enableGeneratePlanAndTasks = false + enableGeneratePlanAndTasks = ${?ENABLE_GENERATE_PLAN_AND_TASKS} enableRecordTracking = false enableRecordTracking = ${?ENABLE_RECORD_TRACKING} enableDeleteGeneratedRecords = false enableDeleteGeneratedRecords = ${?ENABLE_DELETE_GENERATED_RECORDS} enableFailOnError = true enableFailOnError = ${?ENABLE_FAIL_ON_ERROR} + enableUniqueCheck = true + enableUniqueCheck = ${?ENABLE_UNIQUE_CHECK} enableSinkMetadata = true - enableSinkMetadata = ${?ENABLED_SINK_METADATA} + enableSinkMetadata = ${?ENABLE_SINK_METADATA} enableSaveReports = true - enableSaveReports = ${?ENABLED_SAVE_REPORTS} + enableSaveReports = ${?ENABLE_SAVE_REPORTS} enableValidation = false - enableValidation = ${?ENABLED_VALIDATION} + enableValidation = ${?ENABLE_VALIDATION} + enableGenerateValidations = false + enableGenerateValidations = ${?ENABLE_GENERATE_VALIDATIONS} + enableAlerts = false + enableAlerts = ${?ENABLE_ALERTS} } folders { generatedPlanAndTaskFolderPath = "/tmp" generatedPlanAndTaskFolderPath = ${?GENERATED_PLAN_AND_TASK_FOLDER_PATH} - planFilePath = "/plan/customer-create-plan.yaml" + planFilePath = "app/src/test/resources/sample/plan/customer-create-plan.yaml" planFilePath = ${?PLAN_FILE_PATH} - taskFolderPath = "/task" + taskFolderPath = "app/src/test/resources/sample/task" taskFolderPath = ${?TASK_FOLDER_PATH} recordTrackingFolderPath = "/tmp/data/generated/recordTracking" recordTrackingFolderPath = ${?RECORD_TRACKING_FOLDER_PATH} + recordTrackingForValidationFolderPath = "/tmp/data/validation/recordTracking" + recordTrackingForValidationFolderPath = ${?RECORD_TRACKING_VALIDATION_FOLDER_PATH} generatedReportsFolderPath = "app/src/test/resources/sample/html" generatedReportsFolderPath = ${?GENERATED_REPORTS_FOLDER_PATH} - validationFolderPath = "app/src/test/resources/sample/validation" - validationFolderPath = ${?VALIDATION_FOLDER_PATH} } metadata { numRecordsFromDataSource = 10000 - numRecordsFromDataSource = ${?METADATA_NUM_RECORDS_FROM_DATA_SOURCE} + numRecordsFromDataSource = ${?NUM_RECORDS_FROM_DATA_SOURCE} numRecordsForAnalysis = 10000 - numRecordsForAnalysis = ${?METADATA_NUM_RECORDS_FOR_ANALYSIS} + numRecordsForAnalysis = ${?NUM_RECORDS_FOR_ANALYSIS} oneOfDistinctCountVsCountThreshold = 0.1 - oneOfDistinctCountVsCountThreshold = ${?METADATA_ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD} + oneOfDistinctCountVsCountThreshold = ${?ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD} + oneOfMinCount = 1000 + oneOfMinCount = ${?ONE_OF_MIN_COUNT} + numGeneratedSamples = 10 + numGeneratedSamples = ${?NUM_GENERATED_SAMPLES} } generation { - numRecordsPerBatch = 100000 - numRecordsPerBatch = ${?GENERATION_NUM_RECORDS_PER_BATCH} + numRecordsPerBatch = 1000000 + numRecordsPerBatch = ${?NUM_RECORDS_PER_BATCH} } -validation {} -alert {} +validation { + numSampleErrorRecords = 5 + numSampleErrorRecords = ${?NUM_SAMPLE_ERROR_RECORDS} + enableDeleteRecordTrackingFiles = true + enableDeleteRecordTrackingFiles = ${?ENABLE_DELETE_RECORD_TRACKING_FILES} +} + +alert { + triggerOn = "all" + triggerOn = ${?ALERT_TRIGGER_ON} + slackAlertConfig { + token = "" + token = ${?ALERT_SLACK_TOKEN} + channels = [] + channels = ${?ALERT_SLACK_CHANNELS} + } +} runtime { master = "local[*]" master = ${?DATA_CATERER_MASTER} config { - "spark.sql.cbo.enabled" = "true" - "spark.sql.adaptive.enabled" = "true" - "spark.sql.cbo.planStats.enabled" = "true" - "spark.sql.legacy.allowUntypedScalaUDF" = "true" - "spark.sql.statistics.histogram.enabled" = "true" - "spark.sql.shuffle.partitions" = "10" - "spark.sql.catalog.postgres" = "" - "spark.sql.catalog.cassandra" = "com.datastax.spark.connector.datasource.CassandraCatalog" - "spark.hadoop.fs.s3a.directory.marker.retention" = "keep" - "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" = "true" - #"spark.hadoop.fs.defaultFS" = "s3a://my-bucket" + "spark.sql.cbo.enabled": "true", + "spark.sql.adaptive.enabled": "true", + "spark.sql.cbo.planStats.enabled": "true", + "spark.sql.legacy.allowUntypedScalaUDF": "true", + "spark.sql.legacy.allowParameterlessCount": "true", + "spark.sql.statistics.histogram.enabled": "true", + "spark.sql.shuffle.partitions": "10", + "spark.sql.catalog.postgres": "", + "spark.sql.catalog.cassandra": "com.datastax.spark.connector.datasource.CassandraCatalog", + "spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.iceberg.type": "hadoop", + "spark.hadoop.fs.s3a.directory.marker.retention": "keep", + "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled": "true", + "spark.hadoop.fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem", + "spark.hadoop.fs.file.impl": "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem", + "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" + } +} + +json { + json { + } +} + +csv { + csv { + } +} + +parquet { + parquet { } } -# connection type jdbc { -# connection name - postgres { -# connection details + postgresCustomer { url = "jdbc:postgresql://localhost:5432/customer" url = ${?POSTGRES_URL} user = "postgres" - user = ${?POSTGRES_USERNAME} + user = ${?POSTGRES_USER} password = "postgres" password = ${?POSTGRES_PASSWORD} driver = "org.postgresql.Driver" } - postgresDvd { - url = "jdbc:postgresql://localhost:5432/dvdrental" - url = ${?POSTGRES_URL} - user = "postgres" - user = ${?POSTGRES_USERNAME} - password = "postgres" - password = ${?POSTGRES_PASSWORD} - driver = "org.postgresql.Driver" - stringtype = "unspecified" - } mysql { url = "jdbc:mysql://localhost:3306/customer" url = ${?MYSQL_URL} @@ -103,6 +135,7 @@ jdbc { } } + org.apache.spark.sql.cassandra { cassandra { spark.cassandra.connection.host = "localhost" @@ -140,30 +173,10 @@ jms { kafka { kafka { - kafka.bootstrap.servers = "localhost:9092" + kafka.bootstrap.servers = "localhost:29092" kafka.bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} } } -parquet { - parquet { - path = "app/src/test/resources/sample" - path = ${?PARQUET_PATH} - } -} - -json { - json { - path = "app/src/test/resources/sample" - path = ${?JSON_PATH} - } -} - -csv { - csv { - path = "app/src/test/resources/sample" - path = ${?CSV_PATH} - } -} datastax-java-driver.advanced.metadata.schema.refreshed-keyspaces = [ "/.*/" ] diff --git a/app/src/test/resources/application.conf b/app/src/test/resources/application.conf index 2cc0ffb..cc30ce9 100644 --- a/app/src/test/resources/application.conf +++ b/app/src/test/resources/application.conf @@ -32,10 +32,15 @@ runtime{ "spark.sql.legacy.allowUntypedScalaUDF" = "true" "spark.sql.statistics.histogram.enabled" = "true" "spark.sql.shuffle.partitions" = "10" - "spark.sql.catalog.postgres" = "" - "spark.sql.catalog.cassandra" = "com.datastax.spark.connector.datasource.CassandraCatalog" - "spark.hadoop.fs.s3a.directory.marker.retention" = "keep" - "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" = "true" + "spark.sql.catalog.postgres": "", + "spark.sql.catalog.cassandra": "com.datastax.spark.connector.datasource.CassandraCatalog", + "spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.iceberg.type": "hadoop", + "spark.hadoop.fs.s3a.directory.marker.retention": "keep", + "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled": "true", + "spark.hadoop.fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem", + "spark.hadoop.fs.file.impl": "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem", + "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" } } From 6ea13d1059279538ffd1a71ff448ba6b97beda4c Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 12:05:58 +0800 Subject: [PATCH 11/17] Update data source name to postgres in main application.conf --- app/src/main/resources/application.conf | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/app/src/main/resources/application.conf b/app/src/main/resources/application.conf index 87503ca..3966ef0 100644 --- a/app/src/main/resources/application.conf +++ b/app/src/main/resources/application.conf @@ -109,13 +109,28 @@ csv { } } +delta { + delta { + } +} + +iceberg { + iceberg { + } +} + +orc { + orc { + } +} + parquet { parquet { } } jdbc { - postgresCustomer { + postgres { url = "jdbc:postgresql://localhost:5432/customer" url = ${?POSTGRES_URL} user = "postgres" From d1f1cdbeb4ff66f8a2a3c8d7dc7eb309429b73cb Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 12:37:20 +0800 Subject: [PATCH 12/17] Try default back to local resource path for html resources --- .../result/DataGenerationResultWriter.scala | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala index 6dbd151..85e887d 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala @@ -65,24 +65,26 @@ class DataGenerationResultWriter(val dataCatererConfiguration: DataCatererConfig private def copyHtmlResources(fileSystem: FileSystem, folder: String): Unit = { val resources = List("main.css", "data_catering_transparent.svg") - if (!foldersConfig.generatedReportsFolderPath.equalsIgnoreCase(DEFAULT_GENERATED_REPORTS_FOLDER_PATH)) { - resources.foreach(resource => { - val defaultResourcePath = new Path(s"file:///$DEFAULT_GENERATED_REPORTS_FOLDER_PATH/$resource") - val tryLocalUri = Try(new Path(getClass.getResource(s"/report/$resource").toURI)) - val resourcePath = tryLocalUri match { - case Failure(_) => - defaultResourcePath - case Success(value) => - Try(value.getName) match { - case Failure(_) => defaultResourcePath - case Success(name) => - if (name.startsWith("jar:")) defaultResourcePath else value - } - } - val destination = s"file:///$folder/$resource" - fileSystem.copyFromLocalFile(resourcePath, new Path(destination)) - }) - } + resources.foreach(resource => { + val defaultResourcePath = new Path(s"file:///$DEFAULT_GENERATED_REPORTS_FOLDER_PATH/$resource") + val localResourcePath = new Path(s"file://app/src/main/resources/report/$resource") + val tryLocalUri = Try(new Path(getClass.getResource(s"/report/$resource").toURI)) + val resourcePath = tryLocalUri match { + case Failure(_) => + Try(defaultResourcePath.toUri) match { + case Failure(_) => localResourcePath + case Success(_) => defaultResourcePath + } + case Success(value) => + Try(value.getName) match { + case Failure(_) => defaultResourcePath + case Success(name) => + if (name.startsWith("jar:")) defaultResourcePath else value + } + } + val destination = s"file:///$folder/$resource" + fileSystem.copyFromLocalFile(resourcePath, new Path(destination)) + }) } private def writeToFile(fileSystem: FileSystem, folderPath: String)(fileName: String, content: Node): Unit = { From c6fb03943b1a61ecc3c2c6374c158c0935931d1e Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 12:53:40 +0800 Subject: [PATCH 13/17] Try default back to local resource path for html resources --- .../core/generator/result/DataGenerationResultWriter.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala index 85e887d..0f3f278 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala @@ -77,7 +77,11 @@ class DataGenerationResultWriter(val dataCatererConfiguration: DataCatererConfig } case Success(value) => Try(value.getName) match { - case Failure(_) => defaultResourcePath + case Failure(_) => + Try(defaultResourcePath.toUri) match { + case Failure(_) => localResourcePath + case Success(_) => defaultResourcePath + } case Success(name) => if (name.startsWith("jar:")) defaultResourcePath else value } From 786648a521ba1a3a691b53e01cb296ffec31f9ba Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 13:32:46 +0800 Subject: [PATCH 14/17] Add in data source name and options to error validation details, write the main css and svg resources directly into report folder --- .../result/DataGenerationResultWriter.scala | 38 +--- .../generator/result/ResultHtmlWriter.scala | 179 ++++++++++++++++++ .../datacaterer/core/model/Constants.scala | 4 + .../core/model/ValidationModels.scala | 17 +- 4 files changed, 199 insertions(+), 39 deletions(-) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala index 0f3f278..94b1278 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_GENERATED_REPORTS_FOLDER_PATH, SPECIFIC_DATA_SOURCE_OPTIONS} import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Field, Plan, Step, Task} import io.github.datacatering.datacaterer.core.listener.SparkRecordListener -import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_VALIDATIONS_HTML} +import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_CATERING_SVG, REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_MAIN_CSS, REPORT_RESULT_JSON, REPORT_TASK_HTML, REPORT_VALIDATIONS_HTML} import io.github.datacatering.datacaterer.core.model.{DataSourceResult, DataSourceResultSummary, StepResultSummary, TaskResultSummary, ValidationConfigResult} import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor import io.github.datacatering.datacaterer.core.util.FileUtil.writeStringToFile @@ -50,47 +50,19 @@ class DataGenerationResultWriter(val dataCatererConfiguration: DataCatererConfig try { fileWriter(REPORT_HOME_HTML, htmlWriter.index(plan, stepSummary, taskSummary, dataSourceSummary, validationResults, dataCatererConfiguration.flagsConfig, sparkRecordListener)) - fileWriter("tasks.html", htmlWriter.taskDetails(taskSummary)) + fileWriter(REPORT_TASK_HTML, htmlWriter.taskDetails(taskSummary)) fileWriter(REPORT_FIELDS_HTML, htmlWriter.stepDetails(stepSummary)) fileWriter(REPORT_DATA_SOURCES_HTML, htmlWriter.dataSourceDetails(stepSummary.flatMap(_.dataSourceResults))) fileWriter(REPORT_VALIDATIONS_HTML, htmlWriter.validations(validationResults, validationConfig)) - writeStringToFile(fileSystem, s"$reportFolder/results.json", resultsAsJson(generationResult, validationResults)) - - copyHtmlResources(fileSystem, reportFolder) + writeStringToFile(fileSystem, s"$reportFolder/$REPORT_RESULT_JSON", resultsAsJson(generationResult, validationResults)) + writeStringToFile(fileSystem, s"$reportFolder/$REPORT_DATA_CATERING_SVG", htmlWriter.dataCateringSvg) + writeStringToFile(fileSystem, s"$reportFolder/$REPORT_MAIN_CSS", htmlWriter.mainCss) } catch { case ex: Exception => LOGGER.error("Failed to write data generation summary to HTML files", ex) } } - private def copyHtmlResources(fileSystem: FileSystem, folder: String): Unit = { - val resources = List("main.css", "data_catering_transparent.svg") - resources.foreach(resource => { - val defaultResourcePath = new Path(s"file:///$DEFAULT_GENERATED_REPORTS_FOLDER_PATH/$resource") - val localResourcePath = new Path(s"file://app/src/main/resources/report/$resource") - val tryLocalUri = Try(new Path(getClass.getResource(s"/report/$resource").toURI)) - val resourcePath = tryLocalUri match { - case Failure(_) => - Try(defaultResourcePath.toUri) match { - case Failure(_) => localResourcePath - case Success(_) => defaultResourcePath - } - case Success(value) => - Try(value.getName) match { - case Failure(_) => - Try(defaultResourcePath.toUri) match { - case Failure(_) => localResourcePath - case Success(_) => defaultResourcePath - } - case Success(name) => - if (name.startsWith("jar:")) defaultResourcePath else value - } - } - val destination = s"file:///$folder/$resource" - fileSystem.copyFromLocalFile(resourcePath, new Path(destination)) - }) - } - private def writeToFile(fileSystem: FileSystem, folderPath: String)(fileName: String, content: Node): Unit = { writeStringToFile(fileSystem, s"$folderPath/$fileName", content.toString()) } diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala index fffb575..023c9a1 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala @@ -739,6 +739,183 @@ class ResultHtmlWriter { } + def mainCss: String = { + """.box-iframe { + | float: left; + | margin-right: 10px; + |} + | + |body { + | margin: 0; + |} + | + |.top-banner { + | height: fit-content; + | background-color: #ff6e42; + | padding: 0 .2rem; + | display: flex; + |} + | + |.top-banner span { + | color: #f2f2f2; + | font-size: 17px; + | padding: 5px 6px; + | display: flex; + | align-items: center; + |} + | + |.logo { + | padding: 5px; + | height: 45px; + | width: auto; + | display: flex; + | align-items: center; + | justify-content: center; + |} + | + |.logo:hover { + | background-color: #ff9100; + | color: black; + |} + | + |.top-banner img { + | height: 35px; + | width: auto; + | display: flex; + | justify-content: center; + | vertical-align: middle; + |} + | + |.topnav { + | overflow: hidden; + | background-color: #ff6e42; + |} + | + |.topnav a { + | float: left; + | color: #f2f2f2; + | text-align: center; + | padding: 8px 10px; + | text-decoration: none; + | font-size: 17px; + |} + | + |.topnav a:hover { + | background-color: #ff9100; + | color: black; + |} + | + |.topnav a.active { + | color: black; + |} + | + |table { + | overflow: hidden; + | transition: max-height 0.2s ease-out; + |} + | + |table.codegrid { + | font-family: monospace; + | font-size: 12px; + | width: auto !important; + |} + | + |table.statementlist { + | width: auto !important; + | font-size: 13px; + |} + | + |table.codegrid td { + | padding: 0 !important; + | border: 0 !important + |} + | + |table td.linenumber { + | width: 40px !important; + |} + | + |td { + | white-space: normal + |} + | + |.table thead th { + | position: sticky; + | top: 0; + | z-index: 1; + |} + | + |table, tr, td, th { + | border-collapse: collapse; + |} + | + |.table-collapsible { + | max-height: 0; + | overflow: hidden; + | transition: max-height 0.2s ease-out; + |} + | + |.collapsible { + | background-color: lightgray; + | color: black; + | cursor: pointer; + | width: 100%; + | border: none; + | text-align: left; + | outline: none; + |} + | + |.collapsible:after { + | content: "\02795"; /* Unicode character for "plus" sign (+) */ + | color: white; + | float: right; + |} + | + |.active:after { + | content: "\2796"; /* Unicode character for "minus" sign (-) */ + |} + | + |.outer-container { + | display: flex; + | flex-direction: column; + | height: 100vh; + |} + | + |.top-container { + | height: 50%; + | overflow: auto; + | resize: vertical; + |} + | + |.bottom-container { + | flex: 1; + | min-height: 0; + | height: 50%; + | overflow: auto; + | resize: vertical; + |} + | + |.slider { + | text-align: center; + | background-color: #dee2e6; + | cursor: row-resize; + | user-select: none; + |} + | + |.selected-row { + | background-color: #ff6e42 !important; + |} + | + |.progress { + | white-space: normal; + | background-color: #d9534f; + |} + | + |.progress-bar { + | color: black; + |} + |""".stripMargin + } + def plugins: NodeBuffer = { @@ -749,4 +926,6 @@ class ResultHtmlWriter { } + + def dataCateringSvg: String = "" } diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala index a3ec359..dbfdf6d 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala @@ -54,9 +54,13 @@ object Constants { lazy val COUNT_NUM_RECORDS = "numRecords" //report + lazy val REPORT_DATA_CATERING_SVG = "data_catering_transparent.svg" lazy val REPORT_DATA_SOURCES_HTML = "data-sources.html" lazy val REPORT_FIELDS_HTML = "steps.html" lazy val REPORT_HOME_HTML = "index.html" + lazy val REPORT_MAIN_CSS = "main.css" + lazy val REPORT_RESULT_JSON = "results.json" + lazy val REPORT_TASK_HTML = "tasks.html" lazy val REPORT_VALIDATIONS_HTML = "validations.html" //connection group type diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala index 3b98c8b..d12dc4e 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala @@ -1,6 +1,7 @@ package io.github.datacatering.datacaterer.core.model import io.github.datacatering.datacaterer.api.model.{ExpressionValidation, Validation} +import io.github.datacatering.datacaterer.core.util.ConfigUtil.cleanseOptions import io.github.datacatering.datacaterer.core.util.ResultWriterUtil.getSuccessSymbol import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema @@ -29,15 +30,19 @@ case class ValidationConfigResult( } def jsonSummary(numErrorSamples: Int): Map[String, Any] = { - val validationRes = dataSourceValidationResults.flatMap(_.validationResults) + val validationRes = dataSourceValidationResults.flatMap(dsv => + dsv.validationResults.map(v => (dsv.dataSourceName, dsv.options, v)) + ) if (validationRes.nonEmpty) { - val (numSuccess, successRate, isSuccess) = baseSummary(validationRes) - val errorMap = validationRes.filter(!_.isSuccess).map(res => { - val validationDetails = res.validation.toOptions.map(v => (v.head, v.last)).toMap + val (numSuccess, successRate, isSuccess) = baseSummary(validationRes.map(_._3)) + val errorMap = validationRes.filter(vr => !vr._3.isSuccess).map(res => { + val validationDetails = res._3.validation.toOptions.map(v => (v.head, v.last)).toMap Map( + "dataSourceName" -> res._1, + "options" -> cleanseOptions(res._2), "validation" -> validationDetails, - "numErrors" -> res.numErrors, - "sampleErrorValues" -> getErrorSamplesAsMap(numErrorSamples, res) + "numErrors" -> res._3.numErrors, + "sampleErrorValues" -> getErrorSamplesAsMap(numErrorSamples, res._3) ) }) val baseValidationMap = Map( From 19371d469b9f5c99c100084f0c9f25542bd37cc6 Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 13:50:27 +0800 Subject: [PATCH 15/17] Take out clean from gradle command in github action --- .github/workflows/check.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index d34e513..f92c23f 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -13,7 +13,7 @@ jobs: - name: Gradle build with cache uses: burrunan/gradle-cache-action@v1 with: - arguments: "clean :app:shadowJar" + arguments: ":app:shadowJar" - name: Run integration tests id: tests uses: data-catering/insta-integration@v1 From 6fa3f0c2a03fdc66ab59f636f0fa828de6fb8b9a Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 13:55:09 +0800 Subject: [PATCH 16/17] Use account_number instead of account_id for insta-integration validations --- insta-integration.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/insta-integration.yaml b/insta-integration.yaml index 286a2da..84038e6 100644 --- a/insta-integration.yaml +++ b/insta-integration.yaml @@ -14,13 +14,13 @@ run: - options: dbtable: account.balances validations: - - expr: ISNOTNULL(account_id) + - expr: ISNOTNULL(account_number) - aggType: count aggExpr: count == 1000 - options: dbtable: account.transactions validations: - - expr: ISNOTNULL(account_id) + - expr: ISNOTNULL(account_number) - aggType: count aggExpr: count == 5000 - groupByCols: [account_number] From db02a4d89f6a788b2207058a67a98d555d1d629b Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Tue, 9 Jul 2024 16:17:17 +0800 Subject: [PATCH 17/17] Fix insta-integration github action, don't try to generate more records when random amount of records defined, write css and svg to reports folder --- .github/workflows/check.yml | 6 +- app/src/main/resources/application.conf | 152 +++++++++------ .../core/generator/BatchDataProcessor.scala | 21 +- .../result/DataGenerationResultWriter.scala | 32 +--- .../generator/result/ResultHtmlWriter.scala | 179 ++++++++++++++++++ .../datacaterer/core/model/Constants.scala | 4 + .../core/model/ValidationModels.scala | 17 +- .../core/util/UniqueFieldsUtil.scala | 6 +- app/src/test/resources/application.conf | 13 +- .../sample/sql/postgres/customer.sql | 6 +- .../postgres-balance-transaction-task.yaml | 1 - insta-integration.yaml | 6 +- 12 files changed, 326 insertions(+), 117 deletions(-) diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index ed9880a..f92c23f 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -10,8 +10,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Run build - run: "./gradlew clean :app:shadowJar" + - name: Gradle build with cache + uses: burrunan/gradle-cache-action@v1 + with: + arguments: ":app:shadowJar" - name: Run integration tests id: tests uses: data-catering/insta-integration@v1 diff --git a/app/src/main/resources/application.conf b/app/src/main/resources/application.conf index 14898ef..3966ef0 100644 --- a/app/src/main/resources/application.conf +++ b/app/src/main/resources/application.conf @@ -1,97 +1,144 @@ flags { - enableGeneratePlanAndTasks = false - enableGeneratePlanAndTasks = ${?ENABLE_GENERATE_PLAN_AND_TASKS} enableCount = true enableCount = ${?ENABLE_COUNT} enableGenerateData = true enableGenerateData = ${?ENABLE_GENERATE_DATA} + enableGeneratePlanAndTasks = false + enableGeneratePlanAndTasks = ${?ENABLE_GENERATE_PLAN_AND_TASKS} enableRecordTracking = false enableRecordTracking = ${?ENABLE_RECORD_TRACKING} enableDeleteGeneratedRecords = false enableDeleteGeneratedRecords = ${?ENABLE_DELETE_GENERATED_RECORDS} enableFailOnError = true enableFailOnError = ${?ENABLE_FAIL_ON_ERROR} + enableUniqueCheck = true + enableUniqueCheck = ${?ENABLE_UNIQUE_CHECK} enableSinkMetadata = true - enableSinkMetadata = ${?ENABLED_SINK_METADATA} + enableSinkMetadata = ${?ENABLE_SINK_METADATA} enableSaveReports = true - enableSaveReports = ${?ENABLED_SAVE_REPORTS} + enableSaveReports = ${?ENABLE_SAVE_REPORTS} enableValidation = false - enableValidation = ${?ENABLED_VALIDATION} + enableValidation = ${?ENABLE_VALIDATION} + enableGenerateValidations = false + enableGenerateValidations = ${?ENABLE_GENERATE_VALIDATIONS} + enableAlerts = false + enableAlerts = ${?ENABLE_ALERTS} } folders { generatedPlanAndTaskFolderPath = "/tmp" generatedPlanAndTaskFolderPath = ${?GENERATED_PLAN_AND_TASK_FOLDER_PATH} - planFilePath = "/plan/customer-create-plan.yaml" + planFilePath = "app/src/test/resources/sample/plan/customer-create-plan.yaml" planFilePath = ${?PLAN_FILE_PATH} - taskFolderPath = "/task" + taskFolderPath = "app/src/test/resources/sample/task" taskFolderPath = ${?TASK_FOLDER_PATH} recordTrackingFolderPath = "/tmp/data/generated/recordTracking" recordTrackingFolderPath = ${?RECORD_TRACKING_FOLDER_PATH} + recordTrackingForValidationFolderPath = "/tmp/data/validation/recordTracking" + recordTrackingForValidationFolderPath = ${?RECORD_TRACKING_VALIDATION_FOLDER_PATH} generatedReportsFolderPath = "app/src/test/resources/sample/html" generatedReportsFolderPath = ${?GENERATED_REPORTS_FOLDER_PATH} - validationFolderPath = "app/src/test/resources/sample/validation" - validationFolderPath = ${?VALIDATION_FOLDER_PATH} } metadata { numRecordsFromDataSource = 10000 - numRecordsFromDataSource = ${?METADATA_NUM_RECORDS_FROM_DATA_SOURCE} + numRecordsFromDataSource = ${?NUM_RECORDS_FROM_DATA_SOURCE} numRecordsForAnalysis = 10000 - numRecordsForAnalysis = ${?METADATA_NUM_RECORDS_FOR_ANALYSIS} + numRecordsForAnalysis = ${?NUM_RECORDS_FOR_ANALYSIS} oneOfDistinctCountVsCountThreshold = 0.1 - oneOfDistinctCountVsCountThreshold = ${?METADATA_ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD} + oneOfDistinctCountVsCountThreshold = ${?ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD} + oneOfMinCount = 1000 + oneOfMinCount = ${?ONE_OF_MIN_COUNT} + numGeneratedSamples = 10 + numGeneratedSamples = ${?NUM_GENERATED_SAMPLES} } generation { - numRecordsPerBatch = 100000 - numRecordsPerBatch = ${?GENERATION_NUM_RECORDS_PER_BATCH} + numRecordsPerBatch = 1000000 + numRecordsPerBatch = ${?NUM_RECORDS_PER_BATCH} } -validation {} -alert {} +validation { + numSampleErrorRecords = 5 + numSampleErrorRecords = ${?NUM_SAMPLE_ERROR_RECORDS} + enableDeleteRecordTrackingFiles = true + enableDeleteRecordTrackingFiles = ${?ENABLE_DELETE_RECORD_TRACKING_FILES} +} + +alert { + triggerOn = "all" + triggerOn = ${?ALERT_TRIGGER_ON} + slackAlertConfig { + token = "" + token = ${?ALERT_SLACK_TOKEN} + channels = [] + channels = ${?ALERT_SLACK_CHANNELS} + } +} runtime { master = "local[*]" master = ${?DATA_CATERER_MASTER} config { - "spark.sql.cbo.enabled" = "true" - "spark.sql.adaptive.enabled" = "true" - "spark.sql.cbo.planStats.enabled" = "true" - "spark.sql.legacy.allowUntypedScalaUDF" = "true" - "spark.sql.statistics.histogram.enabled" = "true" - "spark.sql.shuffle.partitions" = "10" - "spark.sql.catalog.postgres" = "" - "spark.sql.catalog.cassandra" = "com.datastax.spark.connector.datasource.CassandraCatalog" - "spark.hadoop.fs.s3a.directory.marker.retention" = "keep" - "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" = "true" - #"spark.hadoop.fs.defaultFS" = "s3a://my-bucket" + "spark.sql.cbo.enabled": "true", + "spark.sql.adaptive.enabled": "true", + "spark.sql.cbo.planStats.enabled": "true", + "spark.sql.legacy.allowUntypedScalaUDF": "true", + "spark.sql.legacy.allowParameterlessCount": "true", + "spark.sql.statistics.histogram.enabled": "true", + "spark.sql.shuffle.partitions": "10", + "spark.sql.catalog.postgres": "", + "spark.sql.catalog.cassandra": "com.datastax.spark.connector.datasource.CassandraCatalog", + "spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.iceberg.type": "hadoop", + "spark.hadoop.fs.s3a.directory.marker.retention": "keep", + "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled": "true", + "spark.hadoop.fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem", + "spark.hadoop.fs.file.impl": "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem", + "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" + } +} + +json { + json { + } +} + +csv { + csv { + } +} + +delta { + delta { + } +} + +iceberg { + iceberg { + } +} + +orc { + orc { + } +} + +parquet { + parquet { } } -# connection type jdbc { -# connection name postgres { -# connection details url = "jdbc:postgresql://localhost:5432/customer" url = ${?POSTGRES_URL} user = "postgres" - user = ${?POSTGRES_USERNAME} + user = ${?POSTGRES_USER} password = "postgres" password = ${?POSTGRES_PASSWORD} driver = "org.postgresql.Driver" } - postgresDvd { - url = "jdbc:postgresql://localhost:5432/dvdrental" - url = ${?POSTGRES_URL} - user = "postgres" - user = ${?POSTGRES_USERNAME} - password = "postgres" - password = ${?POSTGRES_PASSWORD} - driver = "org.postgresql.Driver" - stringtype = "unspecified" - } mysql { url = "jdbc:mysql://localhost:3306/customer" url = ${?MYSQL_URL} @@ -103,6 +150,7 @@ jdbc { } } + org.apache.spark.sql.cassandra { cassandra { spark.cassandra.connection.host = "localhost" @@ -140,30 +188,10 @@ jms { kafka { kafka { - kafka.bootstrap.servers = "localhost:9092" + kafka.bootstrap.servers = "localhost:29092" kafka.bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} } } -parquet { - parquet { - path = "app/src/test/resources/sample" - path = ${?PARQUET_PATH} - } -} - -json { - json { - path = "app/src/test/resources/sample" - path = ${?JSON_PATH} - } -} - -csv { - csv { - path = "app/src/test/resources/sample" - path = ${?CSV_PATH} - } -} datastax-java-driver.advanced.metadata.schema.refreshed-keyspaces = [ "/.*/" ] diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala index f2f1e7c..97a32ca 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala @@ -68,14 +68,19 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String s"new-num-records=${additionalDf.count()}, actual-num-records=$dfRecordCount") } - while (targetNumRecords != dfRecordCount && retries < maxRetries) { - retries += 1 - generateAdditionalRecords() - } - if (targetNumRecords != dfRecordCount && retries == maxRetries) { - LOGGER.warn("Unable to reach expected number of records due to reaching max retries. " + - s"Can be due to limited number of potential unique records, " + - s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}") + //if random amount of records, don't try to regenerate more records + if (s.count.generator.isEmpty && s.count.perColumn.forall(_.generator.isEmpty)) { + while (targetNumRecords != dfRecordCount && retries < maxRetries) { + retries += 1 + generateAdditionalRecords() + } + if (targetNumRecords != dfRecordCount && retries == maxRetries) { + LOGGER.warn("Unable to reach expected number of records due to reaching max retries. " + + s"Can be due to limited number of potential unique records, " + + s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}") + } + } else { + LOGGER.debug("Random amount of records generated, not attempting to generate more records") } trackRecordsPerStep = trackRecordsPerStep ++ Map(recordStepName -> stepRecords.copy(currentNumRecords = dfRecordCount)) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala index 6dbd151..94b1278 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_GENERATED_REPORTS_FOLDER_PATH, SPECIFIC_DATA_SOURCE_OPTIONS} import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Field, Plan, Step, Task} import io.github.datacatering.datacaterer.core.listener.SparkRecordListener -import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_VALIDATIONS_HTML} +import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_CATERING_SVG, REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_MAIN_CSS, REPORT_RESULT_JSON, REPORT_TASK_HTML, REPORT_VALIDATIONS_HTML} import io.github.datacatering.datacaterer.core.model.{DataSourceResult, DataSourceResultSummary, StepResultSummary, TaskResultSummary, ValidationConfigResult} import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor import io.github.datacatering.datacaterer.core.util.FileUtil.writeStringToFile @@ -50,41 +50,19 @@ class DataGenerationResultWriter(val dataCatererConfiguration: DataCatererConfig try { fileWriter(REPORT_HOME_HTML, htmlWriter.index(plan, stepSummary, taskSummary, dataSourceSummary, validationResults, dataCatererConfiguration.flagsConfig, sparkRecordListener)) - fileWriter("tasks.html", htmlWriter.taskDetails(taskSummary)) + fileWriter(REPORT_TASK_HTML, htmlWriter.taskDetails(taskSummary)) fileWriter(REPORT_FIELDS_HTML, htmlWriter.stepDetails(stepSummary)) fileWriter(REPORT_DATA_SOURCES_HTML, htmlWriter.dataSourceDetails(stepSummary.flatMap(_.dataSourceResults))) fileWriter(REPORT_VALIDATIONS_HTML, htmlWriter.validations(validationResults, validationConfig)) - writeStringToFile(fileSystem, s"$reportFolder/results.json", resultsAsJson(generationResult, validationResults)) - - copyHtmlResources(fileSystem, reportFolder) + writeStringToFile(fileSystem, s"$reportFolder/$REPORT_RESULT_JSON", resultsAsJson(generationResult, validationResults)) + writeStringToFile(fileSystem, s"$reportFolder/$REPORT_DATA_CATERING_SVG", htmlWriter.dataCateringSvg) + writeStringToFile(fileSystem, s"$reportFolder/$REPORT_MAIN_CSS", htmlWriter.mainCss) } catch { case ex: Exception => LOGGER.error("Failed to write data generation summary to HTML files", ex) } } - private def copyHtmlResources(fileSystem: FileSystem, folder: String): Unit = { - val resources = List("main.css", "data_catering_transparent.svg") - if (!foldersConfig.generatedReportsFolderPath.equalsIgnoreCase(DEFAULT_GENERATED_REPORTS_FOLDER_PATH)) { - resources.foreach(resource => { - val defaultResourcePath = new Path(s"file:///$DEFAULT_GENERATED_REPORTS_FOLDER_PATH/$resource") - val tryLocalUri = Try(new Path(getClass.getResource(s"/report/$resource").toURI)) - val resourcePath = tryLocalUri match { - case Failure(_) => - defaultResourcePath - case Success(value) => - Try(value.getName) match { - case Failure(_) => defaultResourcePath - case Success(name) => - if (name.startsWith("jar:")) defaultResourcePath else value - } - } - val destination = s"file:///$folder/$resource" - fileSystem.copyFromLocalFile(resourcePath, new Path(destination)) - }) - } - } - private def writeToFile(fileSystem: FileSystem, folderPath: String)(fileName: String, content: Node): Unit = { writeStringToFile(fileSystem, s"$folderPath/$fileName", content.toString()) } diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala index fffb575..023c9a1 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala @@ -739,6 +739,183 @@ class ResultHtmlWriter { } + def mainCss: String = { + """.box-iframe { + | float: left; + | margin-right: 10px; + |} + | + |body { + | margin: 0; + |} + | + |.top-banner { + | height: fit-content; + | background-color: #ff6e42; + | padding: 0 .2rem; + | display: flex; + |} + | + |.top-banner span { + | color: #f2f2f2; + | font-size: 17px; + | padding: 5px 6px; + | display: flex; + | align-items: center; + |} + | + |.logo { + | padding: 5px; + | height: 45px; + | width: auto; + | display: flex; + | align-items: center; + | justify-content: center; + |} + | + |.logo:hover { + | background-color: #ff9100; + | color: black; + |} + | + |.top-banner img { + | height: 35px; + | width: auto; + | display: flex; + | justify-content: center; + | vertical-align: middle; + |} + | + |.topnav { + | overflow: hidden; + | background-color: #ff6e42; + |} + | + |.topnav a { + | float: left; + | color: #f2f2f2; + | text-align: center; + | padding: 8px 10px; + | text-decoration: none; + | font-size: 17px; + |} + | + |.topnav a:hover { + | background-color: #ff9100; + | color: black; + |} + | + |.topnav a.active { + | color: black; + |} + | + |table { + | overflow: hidden; + | transition: max-height 0.2s ease-out; + |} + | + |table.codegrid { + | font-family: monospace; + | font-size: 12px; + | width: auto !important; + |} + | + |table.statementlist { + | width: auto !important; + | font-size: 13px; + |} + | + |table.codegrid td { + | padding: 0 !important; + | border: 0 !important + |} + | + |table td.linenumber { + | width: 40px !important; + |} + | + |td { + | white-space: normal + |} + | + |.table thead th { + | position: sticky; + | top: 0; + | z-index: 1; + |} + | + |table, tr, td, th { + | border-collapse: collapse; + |} + | + |.table-collapsible { + | max-height: 0; + | overflow: hidden; + | transition: max-height 0.2s ease-out; + |} + | + |.collapsible { + | background-color: lightgray; + | color: black; + | cursor: pointer; + | width: 100%; + | border: none; + | text-align: left; + | outline: none; + |} + | + |.collapsible:after { + | content: "\02795"; /* Unicode character for "plus" sign (+) */ + | color: white; + | float: right; + |} + | + |.active:after { + | content: "\2796"; /* Unicode character for "minus" sign (-) */ + |} + | + |.outer-container { + | display: flex; + | flex-direction: column; + | height: 100vh; + |} + | + |.top-container { + | height: 50%; + | overflow: auto; + | resize: vertical; + |} + | + |.bottom-container { + | flex: 1; + | min-height: 0; + | height: 50%; + | overflow: auto; + | resize: vertical; + |} + | + |.slider { + | text-align: center; + | background-color: #dee2e6; + | cursor: row-resize; + | user-select: none; + |} + | + |.selected-row { + | background-color: #ff6e42 !important; + |} + | + |.progress { + | white-space: normal; + | background-color: #d9534f; + |} + | + |.progress-bar { + | color: black; + |} + |""".stripMargin + } + def plugins: NodeBuffer = { @@ -749,4 +926,6 @@ class ResultHtmlWriter { } + + def dataCateringSvg: String = "" } diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala index a3ec359..dbfdf6d 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala @@ -54,9 +54,13 @@ object Constants { lazy val COUNT_NUM_RECORDS = "numRecords" //report + lazy val REPORT_DATA_CATERING_SVG = "data_catering_transparent.svg" lazy val REPORT_DATA_SOURCES_HTML = "data-sources.html" lazy val REPORT_FIELDS_HTML = "steps.html" lazy val REPORT_HOME_HTML = "index.html" + lazy val REPORT_MAIN_CSS = "main.css" + lazy val REPORT_RESULT_JSON = "results.json" + lazy val REPORT_TASK_HTML = "tasks.html" lazy val REPORT_VALIDATIONS_HTML = "validations.html" //connection group type diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala index 3b98c8b..d12dc4e 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala @@ -1,6 +1,7 @@ package io.github.datacatering.datacaterer.core.model import io.github.datacatering.datacaterer.api.model.{ExpressionValidation, Validation} +import io.github.datacatering.datacaterer.core.util.ConfigUtil.cleanseOptions import io.github.datacatering.datacaterer.core.util.ResultWriterUtil.getSuccessSymbol import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema @@ -29,15 +30,19 @@ case class ValidationConfigResult( } def jsonSummary(numErrorSamples: Int): Map[String, Any] = { - val validationRes = dataSourceValidationResults.flatMap(_.validationResults) + val validationRes = dataSourceValidationResults.flatMap(dsv => + dsv.validationResults.map(v => (dsv.dataSourceName, dsv.options, v)) + ) if (validationRes.nonEmpty) { - val (numSuccess, successRate, isSuccess) = baseSummary(validationRes) - val errorMap = validationRes.filter(!_.isSuccess).map(res => { - val validationDetails = res.validation.toOptions.map(v => (v.head, v.last)).toMap + val (numSuccess, successRate, isSuccess) = baseSummary(validationRes.map(_._3)) + val errorMap = validationRes.filter(vr => !vr._3.isSuccess).map(res => { + val validationDetails = res._3.validation.toOptions.map(v => (v.head, v.last)).toMap Map( + "dataSourceName" -> res._1, + "options" -> cleanseOptions(res._2), "validation" -> validationDetails, - "numErrors" -> res.numErrors, - "sampleErrorValues" -> getErrorSamplesAsMap(numErrorSamples, res) + "numErrors" -> res._3.numErrors, + "sampleErrorValues" -> getErrorSamplesAsMap(numErrorSamples, res._3) ) }) val baseValidationMap = Map( diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala index 32be44d..35e2202 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala @@ -100,8 +100,10 @@ class UniqueFieldsUtil(plan: Plan, executableTasks: List[(TaskSummary, Task)])(i val uniqueKeys = step.gatherUniqueFields val uniqueKeyUf = if (uniqueKeys.nonEmpty) uniqueKeys.map(u => UniqueFields(t._1.dataSourceName, step.name, List(u))) else List() val allKeys = primaryKeyUf ++ uniqueKeyUf - LOGGER.debug(s"Found unique fields that require unique values, " + - s"data-source-name=${t._1.dataSourceName}, step-name=${step.name}, columns=${allKeys.map(_.columns).mkString(",")}") + if (allKeys.nonEmpty) { + LOGGER.debug(s"Found unique fields that require unique values, " + + s"data-source-name=${t._1.dataSourceName}, step-name=${step.name}, columns=${allKeys.map(_.columns).mkString(",")}") + } allKeys }) }) diff --git a/app/src/test/resources/application.conf b/app/src/test/resources/application.conf index 2cc0ffb..cc30ce9 100644 --- a/app/src/test/resources/application.conf +++ b/app/src/test/resources/application.conf @@ -32,10 +32,15 @@ runtime{ "spark.sql.legacy.allowUntypedScalaUDF" = "true" "spark.sql.statistics.histogram.enabled" = "true" "spark.sql.shuffle.partitions" = "10" - "spark.sql.catalog.postgres" = "" - "spark.sql.catalog.cassandra" = "com.datastax.spark.connector.datasource.CassandraCatalog" - "spark.hadoop.fs.s3a.directory.marker.retention" = "keep" - "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" = "true" + "spark.sql.catalog.postgres": "", + "spark.sql.catalog.cassandra": "com.datastax.spark.connector.datasource.CassandraCatalog", + "spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.iceberg.type": "hadoop", + "spark.hadoop.fs.s3a.directory.marker.retention": "keep", + "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled": "true", + "spark.hadoop.fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem", + "spark.hadoop.fs.file.impl": "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem", + "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" } } diff --git a/app/src/test/resources/sample/sql/postgres/customer.sql b/app/src/test/resources/sample/sql/postgres/customer.sql index 2830421..d5bffb5 100644 --- a/app/src/test/resources/sample/sql/postgres/customer.sql +++ b/app/src/test/resources/sample/sql/postgres/customer.sql @@ -25,18 +25,18 @@ CREATE TABLE IF NOT EXISTS account.balances ( account_number VARCHAR(20) UNIQUE NOT NULL, create_time TIMESTAMP, + account_status VARCHAR(10), balance DOUBLE PRECISION, PRIMARY KEY (account_number, create_time) ); CREATE TABLE IF NOT EXISTS account.transactions ( - account_number VARCHAR(20) UNIQUE NOT NULL, + account_number VARCHAR(20) NOT NULL REFERENCES account.balances (account_number), create_time TIMESTAMP, transaction_id VARCHAR(20), amount DOUBLE PRECISION, - PRIMARY KEY (account_number, create_time, transaction_id), - CONSTRAINT fk_txn_account_number FOREIGN KEY (account_number) REFERENCES account.balances (account_number) + PRIMARY KEY (account_number, create_time, transaction_id) ); CREATE TABLE IF NOT EXISTS account.mapping diff --git a/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml b/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml index c4cd1ed..1697365 100644 --- a/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml +++ b/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml @@ -13,7 +13,6 @@ steps: type: "regex" options: regex: "ACC1[0-9]{5,10}" - isUnique: true - name: "create_time" type: "timestamp" - name: "account_status" diff --git a/insta-integration.yaml b/insta-integration.yaml index dbf6b12..84038e6 100644 --- a/insta-integration.yaml +++ b/insta-integration.yaml @@ -6,19 +6,21 @@ run: env: PLAN_FILE_PATH: app/src/test/resources/sample/plan/account-balance-transaction-plan.yaml TASK_FOLDER_PATH: app/src/test/resources/sample/task + APPLICATION_CONFIG_PATH: app/src/main/resources/application.conf + generateFirst: false test: validation: postgres: - options: dbtable: account.balances validations: - - expr: ISNOTNULL(account_id) + - expr: ISNOTNULL(account_number) - aggType: count aggExpr: count == 1000 - options: dbtable: account.transactions validations: - - expr: ISNOTNULL(account_id) + - expr: ISNOTNULL(account_number) - aggType: count aggExpr: count == 5000 - groupByCols: [account_number]