Skip to content

Commit

Permalink
Merge pull request #71 from data-catering/insta-integration
Browse files Browse the repository at this point in the history
Insta integration
  • Loading branch information
pflooky authored Jul 9, 2024
2 parents e1436ed + e7ca434 commit 7d86517
Show file tree
Hide file tree
Showing 12 changed files with 326 additions and 117 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run build
run: "./gradlew clean :app:shadowJar"
- name: Gradle build with cache
uses: burrunan/gradle-cache-action@v1
with:
arguments: ":app:shadowJar"
- name: Run integration tests
id: tests
uses: data-catering/insta-integration@v1
Expand Down
152 changes: 90 additions & 62 deletions app/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,97 +1,144 @@
flags {
enableGeneratePlanAndTasks = false
enableGeneratePlanAndTasks = ${?ENABLE_GENERATE_PLAN_AND_TASKS}
enableCount = true
enableCount = ${?ENABLE_COUNT}
enableGenerateData = true
enableGenerateData = ${?ENABLE_GENERATE_DATA}
enableGeneratePlanAndTasks = false
enableGeneratePlanAndTasks = ${?ENABLE_GENERATE_PLAN_AND_TASKS}
enableRecordTracking = false
enableRecordTracking = ${?ENABLE_RECORD_TRACKING}
enableDeleteGeneratedRecords = false
enableDeleteGeneratedRecords = ${?ENABLE_DELETE_GENERATED_RECORDS}
enableFailOnError = true
enableFailOnError = ${?ENABLE_FAIL_ON_ERROR}
enableUniqueCheck = true
enableUniqueCheck = ${?ENABLE_UNIQUE_CHECK}
enableSinkMetadata = true
enableSinkMetadata = ${?ENABLED_SINK_METADATA}
enableSinkMetadata = ${?ENABLE_SINK_METADATA}
enableSaveReports = true
enableSaveReports = ${?ENABLED_SAVE_REPORTS}
enableSaveReports = ${?ENABLE_SAVE_REPORTS}
enableValidation = false
enableValidation = ${?ENABLED_VALIDATION}
enableValidation = ${?ENABLE_VALIDATION}
enableGenerateValidations = false
enableGenerateValidations = ${?ENABLE_GENERATE_VALIDATIONS}
enableAlerts = false
enableAlerts = ${?ENABLE_ALERTS}
}

folders {
generatedPlanAndTaskFolderPath = "/tmp"
generatedPlanAndTaskFolderPath = ${?GENERATED_PLAN_AND_TASK_FOLDER_PATH}
planFilePath = "/plan/customer-create-plan.yaml"
planFilePath = "app/src/test/resources/sample/plan/customer-create-plan.yaml"
planFilePath = ${?PLAN_FILE_PATH}
taskFolderPath = "/task"
taskFolderPath = "app/src/test/resources/sample/task"
taskFolderPath = ${?TASK_FOLDER_PATH}
recordTrackingFolderPath = "/tmp/data/generated/recordTracking"
recordTrackingFolderPath = ${?RECORD_TRACKING_FOLDER_PATH}
recordTrackingForValidationFolderPath = "/tmp/data/validation/recordTracking"
recordTrackingForValidationFolderPath = ${?RECORD_TRACKING_VALIDATION_FOLDER_PATH}
generatedReportsFolderPath = "app/src/test/resources/sample/html"
generatedReportsFolderPath = ${?GENERATED_REPORTS_FOLDER_PATH}
validationFolderPath = "app/src/test/resources/sample/validation"
validationFolderPath = ${?VALIDATION_FOLDER_PATH}
}

metadata {
numRecordsFromDataSource = 10000
numRecordsFromDataSource = ${?METADATA_NUM_RECORDS_FROM_DATA_SOURCE}
numRecordsFromDataSource = ${?NUM_RECORDS_FROM_DATA_SOURCE}
numRecordsForAnalysis = 10000
numRecordsForAnalysis = ${?METADATA_NUM_RECORDS_FOR_ANALYSIS}
numRecordsForAnalysis = ${?NUM_RECORDS_FOR_ANALYSIS}
oneOfDistinctCountVsCountThreshold = 0.1
oneOfDistinctCountVsCountThreshold = ${?METADATA_ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD}
oneOfDistinctCountVsCountThreshold = ${?ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD}
oneOfMinCount = 1000
oneOfMinCount = ${?ONE_OF_MIN_COUNT}
numGeneratedSamples = 10
numGeneratedSamples = ${?NUM_GENERATED_SAMPLES}
}

generation {
numRecordsPerBatch = 100000
numRecordsPerBatch = ${?GENERATION_NUM_RECORDS_PER_BATCH}
numRecordsPerBatch = 1000000
numRecordsPerBatch = ${?NUM_RECORDS_PER_BATCH}
}

validation {}
alert {}
validation {
numSampleErrorRecords = 5
numSampleErrorRecords = ${?NUM_SAMPLE_ERROR_RECORDS}
enableDeleteRecordTrackingFiles = true
enableDeleteRecordTrackingFiles = ${?ENABLE_DELETE_RECORD_TRACKING_FILES}
}

alert {
triggerOn = "all"
triggerOn = ${?ALERT_TRIGGER_ON}
slackAlertConfig {
token = ""
token = ${?ALERT_SLACK_TOKEN}
channels = []
channels = ${?ALERT_SLACK_CHANNELS}
}
}

runtime {
master = "local[*]"
master = ${?DATA_CATERER_MASTER}
config {
"spark.sql.cbo.enabled" = "true"
"spark.sql.adaptive.enabled" = "true"
"spark.sql.cbo.planStats.enabled" = "true"
"spark.sql.legacy.allowUntypedScalaUDF" = "true"
"spark.sql.statistics.histogram.enabled" = "true"
"spark.sql.shuffle.partitions" = "10"
"spark.sql.catalog.postgres" = ""
"spark.sql.catalog.cassandra" = "com.datastax.spark.connector.datasource.CassandraCatalog"
"spark.hadoop.fs.s3a.directory.marker.retention" = "keep"
"spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" = "true"
#"spark.hadoop.fs.defaultFS" = "s3a://my-bucket"
"spark.sql.cbo.enabled": "true",
"spark.sql.adaptive.enabled": "true",
"spark.sql.cbo.planStats.enabled": "true",
"spark.sql.legacy.allowUntypedScalaUDF": "true",
"spark.sql.legacy.allowParameterlessCount": "true",
"spark.sql.statistics.histogram.enabled": "true",
"spark.sql.shuffle.partitions": "10",
"spark.sql.catalog.postgres": "",
"spark.sql.catalog.cassandra": "com.datastax.spark.connector.datasource.CassandraCatalog",
"spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.iceberg.type": "hadoop",
"spark.hadoop.fs.s3a.directory.marker.retention": "keep",
"spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled": "true",
"spark.hadoop.fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem",
"spark.hadoop.fs.file.impl": "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem",
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
}
}

json {
json {
}
}

csv {
csv {
}
}

delta {
delta {
}
}

iceberg {
iceberg {
}
}

orc {
orc {
}
}

parquet {
parquet {
}
}

# connection type
jdbc {
# connection name
postgres {
# connection details
url = "jdbc:postgresql://localhost:5432/customer"
url = ${?POSTGRES_URL}
user = "postgres"
user = ${?POSTGRES_USERNAME}
user = ${?POSTGRES_USER}
password = "postgres"
password = ${?POSTGRES_PASSWORD}
driver = "org.postgresql.Driver"
}
postgresDvd {
url = "jdbc:postgresql://localhost:5432/dvdrental"
url = ${?POSTGRES_URL}
user = "postgres"
user = ${?POSTGRES_USERNAME}
password = "postgres"
password = ${?POSTGRES_PASSWORD}
driver = "org.postgresql.Driver"
stringtype = "unspecified"
}
mysql {
url = "jdbc:mysql://localhost:3306/customer"
url = ${?MYSQL_URL}
Expand All @@ -103,6 +150,7 @@ jdbc {
}
}


org.apache.spark.sql.cassandra {
cassandra {
spark.cassandra.connection.host = "localhost"
Expand Down Expand Up @@ -140,30 +188,10 @@ jms {

kafka {
kafka {
kafka.bootstrap.servers = "localhost:9092"
kafka.bootstrap.servers = "localhost:29092"
kafka.bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS}
}
}

parquet {
parquet {
path = "app/src/test/resources/sample"
path = ${?PARQUET_PATH}
}
}

json {
json {
path = "app/src/test/resources/sample"
path = ${?JSON_PATH}
}
}

csv {
csv {
path = "app/src/test/resources/sample"
path = ${?CSV_PATH}
}
}

datastax-java-driver.advanced.metadata.schema.refreshed-keyspaces = [ "/.*/" ]
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,19 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String
s"new-num-records=${additionalDf.count()}, actual-num-records=$dfRecordCount")
}

while (targetNumRecords != dfRecordCount && retries < maxRetries) {
retries += 1
generateAdditionalRecords()
}
if (targetNumRecords != dfRecordCount && retries == maxRetries) {
LOGGER.warn("Unable to reach expected number of records due to reaching max retries. " +
s"Can be due to limited number of potential unique records, " +
s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}")
//if random amount of records, don't try to regenerate more records
if (s.count.generator.isEmpty && s.count.perColumn.forall(_.generator.isEmpty)) {
while (targetNumRecords != dfRecordCount && retries < maxRetries) {
retries += 1
generateAdditionalRecords()
}
if (targetNumRecords != dfRecordCount && retries == maxRetries) {
LOGGER.warn("Unable to reach expected number of records due to reaching max retries. " +
s"Can be due to limited number of potential unique records, " +
s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}")
}
} else {
LOGGER.debug("Random amount of records generated, not attempting to generate more records")
}

trackRecordsPerStep = trackRecordsPerStep ++ Map(recordStepName -> stepRecords.copy(currentNumRecords = dfRecordCount))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_GENERATED_REPORTS_FOLDER_PATH, SPECIFIC_DATA_SOURCE_OPTIONS}
import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Field, Plan, Step, Task}
import io.github.datacatering.datacaterer.core.listener.SparkRecordListener
import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_VALIDATIONS_HTML}
import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_CATERING_SVG, REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_MAIN_CSS, REPORT_RESULT_JSON, REPORT_TASK_HTML, REPORT_VALIDATIONS_HTML}
import io.github.datacatering.datacaterer.core.model.{DataSourceResult, DataSourceResultSummary, StepResultSummary, TaskResultSummary, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor
import io.github.datacatering.datacaterer.core.util.FileUtil.writeStringToFile
Expand Down Expand Up @@ -50,41 +50,19 @@ class DataGenerationResultWriter(val dataCatererConfiguration: DataCatererConfig
try {
fileWriter(REPORT_HOME_HTML, htmlWriter.index(plan, stepSummary, taskSummary, dataSourceSummary,
validationResults, dataCatererConfiguration.flagsConfig, sparkRecordListener))
fileWriter("tasks.html", htmlWriter.taskDetails(taskSummary))
fileWriter(REPORT_TASK_HTML, htmlWriter.taskDetails(taskSummary))
fileWriter(REPORT_FIELDS_HTML, htmlWriter.stepDetails(stepSummary))
fileWriter(REPORT_DATA_SOURCES_HTML, htmlWriter.dataSourceDetails(stepSummary.flatMap(_.dataSourceResults)))
fileWriter(REPORT_VALIDATIONS_HTML, htmlWriter.validations(validationResults, validationConfig))
writeStringToFile(fileSystem, s"$reportFolder/results.json", resultsAsJson(generationResult, validationResults))

copyHtmlResources(fileSystem, reportFolder)
writeStringToFile(fileSystem, s"$reportFolder/$REPORT_RESULT_JSON", resultsAsJson(generationResult, validationResults))
writeStringToFile(fileSystem, s"$reportFolder/$REPORT_DATA_CATERING_SVG", htmlWriter.dataCateringSvg)
writeStringToFile(fileSystem, s"$reportFolder/$REPORT_MAIN_CSS", htmlWriter.mainCss)
} catch {
case ex: Exception =>
LOGGER.error("Failed to write data generation summary to HTML files", ex)
}
}

private def copyHtmlResources(fileSystem: FileSystem, folder: String): Unit = {
val resources = List("main.css", "data_catering_transparent.svg")
if (!foldersConfig.generatedReportsFolderPath.equalsIgnoreCase(DEFAULT_GENERATED_REPORTS_FOLDER_PATH)) {
resources.foreach(resource => {
val defaultResourcePath = new Path(s"file:///$DEFAULT_GENERATED_REPORTS_FOLDER_PATH/$resource")
val tryLocalUri = Try(new Path(getClass.getResource(s"/report/$resource").toURI))
val resourcePath = tryLocalUri match {
case Failure(_) =>
defaultResourcePath
case Success(value) =>
Try(value.getName) match {
case Failure(_) => defaultResourcePath
case Success(name) =>
if (name.startsWith("jar:")) defaultResourcePath else value
}
}
val destination = s"file:///$folder/$resource"
fileSystem.copyFromLocalFile(resourcePath, new Path(destination))
})
}
}

private def writeToFile(fileSystem: FileSystem, folderPath: String)(fileName: String, content: Node): Unit = {
writeStringToFile(fileSystem, s"$folderPath/$fileName", content.toString())
}
Expand Down
Loading

0 comments on commit 7d86517

Please sign in to comment.