From 7cdeb974eafd36efdd6b1320e9fce8a65d2cf25c Mon Sep 17 00:00:00 2001 From: jecos Date: Thu, 23 May 2024 08:55:33 -0400 Subject: [PATCH] feat: Add more checkpoint in enrich variants --- .../datalake/spark3/genomics/enriched/Variants.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/genomics/enriched/Variants.scala b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/genomics/enriched/Variants.scala index f193f173..1ea0bba7 100644 --- a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/genomics/enriched/Variants.scala +++ b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/genomics/enriched/Variants.scala @@ -80,7 +80,7 @@ case class Variants(rc: RuntimeETLContext, participantId: Column = col("particip val variantsCheckpoint = if (checkpoint) variants.checkpoint() else variants variantsCheckpoint - .withFrequencies(participantId, affectedStatus, snv, splits) + .withFrequencies(participantId, affectedStatus, snv, splits, checkpoint) .withPopulations(data(thousand_genomes.id), data(topmed_bravo.id), data(gnomad_genomes_v2.id), data(gnomad_exomes_v2.id), data(gnomad_genomes_v3.id)) .withDbSNP(data(dbsnp.id)) .withClinvar(data(clinvar.id)) @@ -206,11 +206,15 @@ object Variants { } - def withFrequencies(participantId: Column, affectedStatus: Column, snv: DataFrame, splits: Seq[OccurrenceSplit]): DataFrame = splits match { + def withFrequencies(participantId: Column, affectedStatus: Column, snv: DataFrame, splits: Seq[OccurrenceSplit], checkpoint: Boolean = false): DataFrame = splits match { case Nil => df case _ => val variantWithFreq = snv.split(participantId = participantId, affectedStatus = affectedStatus, splits) - df.joinByLocus(variantWithFreq, "left") + if (checkpoint) { + df.joinByLocus(variantWithFreq.checkpoint(), "left").checkpoint() + } else { + df.joinByLocus(variantWithFreq, "left") + } } def withSpliceAi(spliceai: DataFrame)(implicit spark: SparkSession): DataFrame = {