diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala index 95999d8c..c19f2a9c 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala @@ -1,12 +1,12 @@ package io.github.datacatering.datacaterer.core.generator import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_ENABLE_GENERATE_DATA, ENABLE_DATA_GENERATION} -import io.github.datacatering.datacaterer.core.util.GeneratorUtil.getDataSourceName -import io.github.datacatering.datacaterer.core.util.RecordCountUtil.calculateNumBatches -import io.github.datacatering.datacaterer.api.model.{FlagsConfig, FoldersConfig, GenerationConfig, MetadataConfig, Plan, Task, TaskSummary} +import io.github.datacatering.datacaterer.api.model.{FlagsConfig, FoldersConfig, GenerationConfig, MetadataConfig, Plan, Step, Task, TaskSummary} import io.github.datacatering.datacaterer.core.model.DataSourceResult import io.github.datacatering.datacaterer.core.sink.SinkFactory +import io.github.datacatering.datacaterer.core.util.GeneratorUtil.getDataSourceName import io.github.datacatering.datacaterer.core.util.PlanImplicits.StepOps +import io.github.datacatering.datacaterer.core.util.RecordCountUtil.calculateNumBatches import io.github.datacatering.datacaterer.core.util.{ForeignKeyUtil, UniqueFieldsUtil} import net.datafaker.Faker import org.apache.log4j.Logger @@ -45,7 +45,7 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String val endIndex = stepRecords.currentNumRecords + stepRecords.numRecordsPerBatch val genDf = dataGeneratorFactory.generateDataForStep(s, task._1.dataSourceName, startIndex, endIndex) - val df = if (s.gatherPrimaryKeys.nonEmpty && flagsConfig.enableUniqueCheck) uniqueFieldUtil.getUniqueFieldsValues(dataSourceStepName, genDf) else genDf + val df = getUniqueGeneratedRecords(uniqueFieldUtil, s, dataSourceStepName, genDf) if (!df.storageLevel.useMemory) df.cache() val dfRecordCount = if (flagsConfig.enableCount) df.count() else stepRecords.numRecordsPerBatch @@ -53,7 +53,7 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String trackRecordsPerStep = trackRecordsPerStep ++ Map(recordStepName -> stepRecords.copy(currentNumRecords = dfRecordCount)) (dataSourceStepName, df) } else { - LOGGER.debug("Step has data generation disabled") + LOGGER.info(s"Step has data generation disabled, data-source=${task._1.dataSourceName}") (dataSourceStepName, sparkSession.emptyDataFrame) } }) @@ -77,7 +77,7 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String task._2.steps.map(s => (getDataSourceName(task._1, s), (s, task._2))) ).toMap - sinkDf.map(df => { + sinkDf.filter(s => !s._2.isEmpty).map(df => { val dataSourceName = df._1.split("\\.").head val (step, task) = stepAndTaskByDataSourceName(df._1) val dataSourceConfig = connectionConfigsByName.getOrElse(dataSourceName, Map()) @@ -87,6 +87,16 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String }) } + private def getUniqueGeneratedRecords(uniqueFieldUtil: UniqueFieldsUtil, s: Step, dataSourceStepName: String, genDf: DataFrame): DataFrame = { + if (s.gatherUniqueFields.nonEmpty || s.gatherPrimaryKeys.nonEmpty) { + LOGGER.debug(s"Ensuring field values are unique since there are fields with isUnique or isPrimaryKey set to true, " + + s"data-source-step-name=$dataSourceStepName") + uniqueFieldUtil.getUniqueFieldsValues(dataSourceStepName, genDf) + } else { + genDf + } + } + private def getDataFaker(plan: Plan): Faker with Serializable = { val optSeed = plan.sinkOptions.flatMap(_.seed) val optLocale = plan.sinkOptions.flatMap(_.locale) diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessorTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessorTest.scala index 866483f9..a133d01f 100644 --- a/app/src/test/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessorTest.scala +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessorTest.scala @@ -269,4 +269,12 @@ class PlanProcessorTest extends SparkSuite { execute(foreignPlan, config, firstJsonTask, secondJsonTask, thirdJsonTask) } + + class TestUniqueFields extends PlanRun { + val jsonTask = json("my_first_json", "/tmp/data/unique_json", Map("saveMode" -> "overwrite")) + .schema( + field.name("account_id").regex("ACC[0-9]{8}").unique(true) + ) + execute(jsonTask) + } } diff --git a/gradle.properties b/gradle.properties index 4c0cc721..f9c2b69c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ groupId=io.github.data-catering -version=0.6.1 +version=0.6.2 scalaVersion=2.12 scalaSpecificVersion=2.12.15