Skip to content

Commit

Permalink
Generate unique values when there are any fields with isUnique or isP…
Browse files Browse the repository at this point in the history
…rimaryKey set to true
  • Loading branch information
pflooky committed Mar 15, 2024
1 parent 2c17dd3 commit 4c4f0b9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package io.github.datacatering.datacaterer.core.generator

import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_ENABLE_GENERATE_DATA, ENABLE_DATA_GENERATION}
import io.github.datacatering.datacaterer.core.util.GeneratorUtil.getDataSourceName
import io.github.datacatering.datacaterer.core.util.RecordCountUtil.calculateNumBatches
import io.github.datacatering.datacaterer.api.model.{FlagsConfig, FoldersConfig, GenerationConfig, MetadataConfig, Plan, Task, TaskSummary}
import io.github.datacatering.datacaterer.api.model.{FlagsConfig, FoldersConfig, GenerationConfig, MetadataConfig, Plan, Step, Task, TaskSummary}
import io.github.datacatering.datacaterer.core.model.DataSourceResult
import io.github.datacatering.datacaterer.core.sink.SinkFactory
import io.github.datacatering.datacaterer.core.util.GeneratorUtil.getDataSourceName
import io.github.datacatering.datacaterer.core.util.PlanImplicits.StepOps
import io.github.datacatering.datacaterer.core.util.RecordCountUtil.calculateNumBatches
import io.github.datacatering.datacaterer.core.util.{ForeignKeyUtil, UniqueFieldsUtil}
import net.datafaker.Faker
import org.apache.log4j.Logger
Expand Down Expand Up @@ -45,15 +45,15 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String
val endIndex = stepRecords.currentNumRecords + stepRecords.numRecordsPerBatch

val genDf = dataGeneratorFactory.generateDataForStep(s, task._1.dataSourceName, startIndex, endIndex)
val df = if (s.gatherPrimaryKeys.nonEmpty && flagsConfig.enableUniqueCheck) uniqueFieldUtil.getUniqueFieldsValues(dataSourceStepName, genDf) else genDf
val df = getUniqueGeneratedRecords(uniqueFieldUtil, s, dataSourceStepName, genDf)

if (!df.storageLevel.useMemory) df.cache()
val dfRecordCount = if (flagsConfig.enableCount) df.count() else stepRecords.numRecordsPerBatch
LOGGER.debug(s"Step record count for batch, batch=$batch, step-name=${s.name}, target-num-records=${stepRecords.numRecordsPerBatch}, actual-num-records=$dfRecordCount")
trackRecordsPerStep = trackRecordsPerStep ++ Map(recordStepName -> stepRecords.copy(currentNumRecords = dfRecordCount))
(dataSourceStepName, df)
} else {
LOGGER.debug("Step has data generation disabled")
LOGGER.info(s"Step has data generation disabled, data-source=${task._1.dataSourceName}")
(dataSourceStepName, sparkSession.emptyDataFrame)
}
})
Expand All @@ -77,7 +77,7 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String
task._2.steps.map(s => (getDataSourceName(task._1, s), (s, task._2)))
).toMap

sinkDf.map(df => {
sinkDf.filter(s => !s._2.isEmpty).map(df => {
val dataSourceName = df._1.split("\\.").head
val (step, task) = stepAndTaskByDataSourceName(df._1)
val dataSourceConfig = connectionConfigsByName.getOrElse(dataSourceName, Map())
Expand All @@ -87,6 +87,16 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String
})
}

private def getUniqueGeneratedRecords(uniqueFieldUtil: UniqueFieldsUtil, s: Step, dataSourceStepName: String, genDf: DataFrame): DataFrame = {
if (s.gatherUniqueFields.nonEmpty || s.gatherPrimaryKeys.nonEmpty) {
LOGGER.debug(s"Ensuring field values are unique since there are fields with isUnique or isPrimaryKey set to true, " +
s"data-source-step-name=$dataSourceStepName")
uniqueFieldUtil.getUniqueFieldsValues(dataSourceStepName, genDf)
} else {
genDf
}
}

private def getDataFaker(plan: Plan): Faker with Serializable = {
val optSeed = plan.sinkOptions.flatMap(_.seed)
val optLocale = plan.sinkOptions.flatMap(_.locale)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,12 @@ class PlanProcessorTest extends SparkSuite {

execute(foreignPlan, config, firstJsonTask, secondJsonTask, thirdJsonTask)
}

class TestUniqueFields extends PlanRun {
val jsonTask = json("my_first_json", "/tmp/data/unique_json", Map("saveMode" -> "overwrite"))
.schema(
field.name("account_id").regex("ACC[0-9]{8}").unique(true)
)
execute(jsonTask)
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
groupId=io.github.data-catering
version=0.6.1
version=0.6.2

scalaVersion=2.12
scalaSpecificVersion=2.12.15
Expand Down

0 comments on commit 4c4f0b9

Please sign in to comment.