diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala index f04ba04a3..d7319cb91 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala @@ -157,7 +157,7 @@ object OperationDef { val dependencyConfigs = conf.getConfigList(DEPENDENCIES_KEY) dependencyConfigs.asScala .zipWithIndex - .map { case (c, i) => MetastoreDependency.fromConfig(c, s"$parent[$i]") } + .map { case (c, i) => MetastoreDependency.fromConfig(c, s"$parent[${i + 1}]") } .toSeq } else { Seq.empty[MetastoreDependency] diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PipelineDef.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PipelineDef.scala index fe25f6474..bf2e854f0 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PipelineDef.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PipelineDef.scala @@ -38,7 +38,7 @@ object PipelineDef { val environment = conf.getString(ENVIRONMENT_NAME) val operations = ConfigUtils.getOptionConfigList(conf, OPERATIONS_KEY) .zipWithIndex - .flatMap{ case (c, i) => OperationDef.fromConfig(c, conf, infoDateConfig, s"$OPERATIONS_KEY[$i]", defaultDelayDays) } + .flatMap{ case (c, i) => OperationDef.fromConfig(c, conf, infoDateConfig, s"$OPERATIONS_KEY[${i+1}]", defaultDelayDays) } .toSeq PipelineDef(name, environment, operations) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala index b19cea0b4..b387b7b2e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala @@ -33,7 +33,6 @@ import java.util.concurrent.Executors.newFixedThreadPool import scala.concurrent.ExecutionContext.fromExecutorService import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContextExecutorService, Future} -import scala.util.Try import scala.util.control.NonFatal class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, @@ -72,6 +71,7 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, if (!loopStarted) { throw new IllegalStateException("Worker loop hasn't started yet") } + log.info("Waiting for worker threads to finish...") Await.result(workersFuture, Duration.Inf) executionContext.shutdown() loopStarted = false @@ -86,23 +86,24 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, completedJobsChannel.send((job, Nil, isSucceeded)) } catch { - case ex: FatalErrorWrapper if ex.cause != null => - log.error(s"${Emoji.FAILURE} A FATAL error has been encountered.", ex.cause) - val fatalEx = new RuntimeException(s"FATAL exception encountered, stopping the pipeline.", ex.cause) - completedJobsChannel.send((job, TaskResult(job, RunStatus.Failed(fatalEx), None, applicationId, isTransient, Nil, Nil, Nil) :: Nil, false)) - completedJobsChannel.close() - case NonFatal(ex) => - completedJobsChannel.send((job, TaskResult(job, RunStatus.Failed(ex), None, applicationId, isTransient, Nil, Nil, Nil) :: Nil, false)) - case ex: Throwable => - log.error(s"${Emoji.FAILURE} A FATAL error has been encountered.", ex) - val fatalEx = new RuntimeException(s"FATAL exception encountered, stopping the pipeline.", ex) - completedJobsChannel.send((job, TaskResult(job, RunStatus.Failed(fatalEx), None, applicationId, isTransient, Nil, Nil, Nil) :: Nil, false)) - completedJobsChannel.close() + case ex: FatalErrorWrapper if ex.cause != null => onFatalException(ex.cause, job, isTransient) + case NonFatal(ex) => sendFailure(ex, job, isTransient) + case ex: Throwable => onFatalException(ex, job, isTransient) } } completedJobsChannel.close() } + private[core] def onFatalException(ex: Throwable, job: Job, isTransient: Boolean): Unit = { + log.error(s"${Emoji.FAILURE} A FATAL error has been encountered.", ex) + val fatalEx = new FatalErrorWrapper(s"FATAL exception encountered, stopping the pipeline.", ex) + sendFailure(fatalEx, job, isTransient) + } + + private[core] def sendFailure(ex: Throwable, job: Job, isTransient: Boolean): Unit = { + completedJobsChannel.send((job, TaskResult(job, RunStatus.Failed(ex), None, applicationId, isTransient, Nil, Nil, Nil) :: Nil, false)) + } + private[core] def runJob(job: Job): Boolean = { val scheduleParams = ScheduleParams.fromRuntimeConfig(runtimeConfig, job.trackDays, job.operation.expectedDelayDays) @@ -121,7 +122,15 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, val fut = taskRunner.runJobTasks(job, taskDefs) + log.info("Waiting for all job tasks to finish...") val statuses = Await.result(fut, Duration.Inf) + log.info("All job tasks have finished.") + + // Rethrow fatal errors so the pipeline can be stopped asap. + statuses.foreach { + case RunStatus.Failed(ex) if ex.isInstanceOf[FatalErrorWrapper] => throw ex + case _ => // skip + } statuses.forall(s => !s.isFailure) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala index 4738fcda7..eaede2061 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory import za.co.absa.pramen.api.DataFormat import za.co.absa.pramen.core.app.AppContext +import za.co.absa.pramen.core.exceptions.FatalErrorWrapper import za.co.absa.pramen.core.pipeline.{Job, JobDependency, OperationType} import za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunner import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils.evaluateRunDate @@ -77,11 +78,14 @@ class OrchestratorImpl extends Orchestrator { jobRunner.startWorkerLoop(runJobChannel) val atLeastOneStarted = sendPendingJobs(runJobChannel, dependencyResolver) + var hasFatalErrors = false if (atLeastOneStarted) { completedJobsChannel.foreach { case (finishedJob, taskResults, isSucceeded) => runningJobs.remove(finishedJob) + hasFatalErrors = hasFatalErrors || taskResults.exists(status => isFatalFailure(status.runStatus)) + val hasAnotherUnfinishedJob = hasAnotherJobWithSameOutputTable(finishedJob.outputTable.name) if (hasAnotherUnfinishedJob) { log.info(s"There is another job outputting to ${finishedJob.outputTable.name}. Waiting for it to finish before marking the table as finished.") @@ -93,9 +97,18 @@ class OrchestratorImpl extends Orchestrator { state.addTaskCompletion(taskResults) - val jobStarted = sendPendingJobs(runJobChannel, dependencyResolver) - if (!jobStarted && runningJobs.isEmpty) { - runJobChannel.close() + if (hasFatalErrors) { + // In case of a fatal error we either need to interrupt running threads, or wait for them to return. + // In the current implementation we wait for threads to finish, but not start new jobs in running threads. + // This can be also reconsidered, if there are issues with the current solutions observed. + if (runningJobs.isEmpty) { + runJobChannel.close() + } + } else { + val jobStarted = sendPendingJobs(runJobChannel, dependencyResolver) + if (!jobStarted && runningJobs.isEmpty) { + runJobChannel.close() + } } } } @@ -117,6 +130,13 @@ class OrchestratorImpl extends Orchestrator { jobRunner.shutdown() } + private def isFatalFailure(runStatus: RunStatus): Boolean = { + runStatus match { + case RunStatus.Failed(ex) if ex.isInstanceOf[FatalErrorWrapper] => true + case _ => false + } + } + private def hasNonPassiveNonOptionalDeps(job: Job, missingTables: Seq[String]): Boolean = { missingTables.exists(table => job.operation.dependencies.exists(d => !d.isPassive && !d.isOptional && d.tables.contains(table)) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index b1bae19f3..67c718884 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -23,7 +23,7 @@ import org.slf4j.LoggerFactory import za.co.absa.pramen.api.{DataFormat, Reason, TaskNotification} import za.co.absa.pramen.core.app.config.RuntimeConfig import za.co.absa.pramen.core.bookkeeper.Bookkeeper -import za.co.absa.pramen.core.exceptions.ReasonException +import za.co.absa.pramen.core.exceptions.{FatalErrorWrapper, ReasonException} import za.co.absa.pramen.core.journal.Journal import za.co.absa.pramen.core.journal.model.TaskCompleted import za.co.absa.pramen.core.lock.TokenLockFactory @@ -323,7 +323,7 @@ abstract class TaskRunnerBase(conf: Config, Seq.empty) } } catch { - case ex: Throwable => Failure(ex) + case ex: Throwable => Failure(new FatalErrorWrapper("Fatal error has occurred.", ex)) } finally { lock.release() } diff --git a/pramen/core/src/test/resources/test/config/integration_parallel_execution.conf b/pramen/core/src/test/resources/test/config/integration_parallel_execution.conf new file mode 100644 index 000000000..412d52a4a --- /dev/null +++ b/pramen/core/src/test/resources/test/config/integration_parallel_execution.conf @@ -0,0 +1,251 @@ +# Copyright 2022 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This variable is expected to be set up by the test suite +#base.path = "/tmp" +#fatal.exception = false + +pramen { + pipeline.name = "Integration test of parallel execution" + + bookkeeping.enabled = false + stop.spark.session = false + + parallel.tasks = 4 +} + +pramen.metastore { + tables = [ + { + name = "table1" + description = "Table 1" + format = "parquet" + path = ${base.path}/table1 + }, + { + name = "table2" + description = "Table 2" + format = "parquet" + path = ${base.path}/table2 + }, + { + name = "table3" + description = "Table 3" + format = "parquet" + path = ${base.path}/table3 + }, + { + name = "table4" + description = "Table 4" + format = "parquet" + path = ${base.path}/table4 + }, + { + name = "table5" + description = "Table 5" + format = "parquet" + path = ${base.path}/table5 + }, + { + name = "table6" + description = "Table 6" + format = "parquet" + path = ${base.path}/table6 + } + ] +} + +pramen.sources.1 = [ + { + name = "spark_source" + factory.class = "za.co.absa.pramen.core.source.SparkSource" + + has.information.date.column = false + } +] + +pramen.sinks.1 = [ + { + name = "spark_sink" + factory.class = "za.co.absa.pramen.core.sink.SparkSink" + + format = "parquet" + mode = "overwrite" + } +] + +pramen.operations = [ + { + name = "Loading data from Spark Catalog" + type = "ingestion" + schedule.type = "daily" + + source = "spark_source" + + info.date.expr = "@runDate" + + tables = [ + { + input.table = "my_table1" + output.metastore.table = "table1" + } + ] + }, + { + name = "Generate some data from the transformer" + type = "transformation" + + class = "za.co.absa.pramen.core.mocks.transformer.GeneratingTransformer" + schedule.type = "daily" + + info.date.expr = "@runDate" + + dependencies = [ ] + + output.table = "table2" + }, + { + name = "First transformation" + type = "transformation" + + class = "za.co.absa.pramen.core.transformers.IdentityTransformer" + schedule.type = "daily" + + info.date.expr = "@runDate" + + dependencies = [ + { + tables = [ table2 ] + date.from = "@infoDate" + #optional = true + } + ] + + option { + input.table = "table2" + } + + output.table = "table3" + }, + { + name = "Failing transformation 1" + type = "transformation" + + class = "za.co.absa.pramen.core.mocks.transformer.FailingTransformer" + schedule.type = "daily" + + info.date.expr = "@runDate" + + dependencies = [ + { + tables = [ table3 ] + date.from = "@infoDate" + #optional = true + } + ] + + option { + fail.validation = true + fatal.exception = ${fatal.exception} + } + + output.table = "table4" + }, + { + name = "Failing transformation 2" + type = "transformation" + + class = "za.co.absa.pramen.core.mocks.transformer.FailingTransformer" + schedule.type = "daily" + + info.date.expr = "@runDate" + + dependencies = [ + { + tables = [ table3 ] + date.from = "@infoDate" + #optional = true + } + ] + + option { + fail.validation = false + fatal.exception = ${fatal.exception} + } + + output.table = "table5" + }, + { + name = "Failing transformation 3" + type = "transformation" + + class = "za.co.absa.pramen.core.mocks.transformer.FailingTransformer" + schedule.type = "daily" + + info.date.expr = "@runDate" + + dependencies = [ + { + tables = [ table4 ] + date.from = "@infoDate" + #optional = true + } + ] + + option { + fail.validation = false + fatal.exception = false + } + + output.table = "table6" + }, + { + name = "Sinking to parquet 1" + type = "sink" + sink = "spark_sink" + schedule.type = "daily" + + tables = [ + { + input.metastore.table = table3 + output.path = ${base.path}"/sink3" + } + ] + }, + { + name = "Sinking to parquet 2" + type = "sink" + sink = "spark_sink" + schedule.type = "daily" + + tables = [ + { + input.metastore.table = table4 + output.path = ${base.path}"/sink4" + } + ] + }, + { + name = "Sinking to parquet 3" + type = "sink" + sink = "spark_sink" + schedule.type = "daily" + + tables = [ + { + input.metastore.table = table5 + output.path = ${base.path}"/sink5" + } + ] + } +] diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/MultiJobTransformersSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/MultiJobTransformersLongSuite.scala similarity index 94% rename from pramen/core/src/test/scala/za/co/absa/pramen/core/integration/MultiJobTransformersSuite.scala rename to pramen/core/src/test/scala/za/co/absa/pramen/core/integration/MultiJobTransformersLongSuite.scala index 8de63491d..f64a2de76 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/MultiJobTransformersSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/MultiJobTransformersLongSuite.scala @@ -27,7 +27,7 @@ import za.co.absa.pramen.core.utils.ResourceUtils import java.time.LocalDate -class MultiJobTransformersSuite extends AnyWordSpec with SparkTestBase with TempDirFixture { +class MultiJobTransformersLongSuite extends AnyWordSpec with SparkTestBase with TempDirFixture { import spark.implicits._ private val infoDateSaturday = LocalDate.parse("2023-11-04") @@ -60,10 +60,10 @@ class MultiJobTransformersSuite extends AnyWordSpec with SparkTestBase with Temp } "work for a day with a different job (source)" in { - withTempDirectory("integration_multi_table_transformers") { tempDir => - val df = List(("A", 1), ("B", 2), ("C", 3)).toDF("a", "b") + val df = List(("A", 1), ("B", 2), ("C", 3)).toDF("a", "b") + df.createOrReplaceTempView("my_table1") - df.createOrReplaceTempView("my_table1") + withTempDirectory("integration_multi_table_transformers") { tempDir => val conf = getConfig(tempDir, infoDateMonday, enableMultiJobPerOutputTable = true) @@ -85,6 +85,8 @@ class MultiJobTransformersSuite extends AnyWordSpec with SparkTestBase with Temp assert(actual2.length == 3) assert(actual2.exists(_.contains(""""a":"A""""))) } + + spark.catalog.dropTempView("my_table1") } "fail if multi jobs per output table is not enabled" in { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/ParallelExecutionLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/ParallelExecutionLongSuite.scala new file mode 100644 index 000000000..c8a882628 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/ParallelExecutionLongSuite.scala @@ -0,0 +1,111 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.integration + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.hadoop.fs.Path +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.core.base.SparkTestBase +import za.co.absa.pramen.core.fixtures.TempDirFixture +import za.co.absa.pramen.core.runner.AppRunner +import za.co.absa.pramen.core.utils.ResourceUtils + +import java.time.LocalDate + +class ParallelExecutionLongSuite extends AnyWordSpec with SparkTestBase with TempDirFixture { + import spark.implicits._ + + private val infoDate = LocalDate.parse("2023-11-04") + + "Pipelines with parallel executions" should { + "work for a pipeline with failures" in { + val df = List(("A", 1), ("B", 2), ("C", 3)).toDF("a", "b") + df.createOrReplaceTempView("my_table1") + + withTempDirectory("integration_parallel_executions1") { tempDir => + val conf = getConfig(tempDir, infoDate) + + val exitCode = AppRunner.runPipeline(conf) + + assert(exitCode == 2) + + val table3Path = new Path(tempDir, "table3") + val sink3Path = new Path(tempDir, "sink3") + + val dfOut1 = spark.read.parquet(table3Path.toString) + val dfOut2 = spark.read.parquet(sink3Path.toString) + + val actual1 = dfOut1.toJSON.collect() + val actual2 = dfOut2.toJSON.collect() + + assert(actual1.length == 3) + assert(actual1.exists(_.contains(""""a":"E""""))) + assert(actual2.length == 3) + assert(actual2.exists(_.contains(""""a":"E""""))) + } + + spark.catalog.dropTempView("my_table1") + } + + "work for a pipeline with fatal exceptions" in { + val df = List(("A", 1), ("B", 2), ("C", 3)).toDF("a", "b") + df.createOrReplaceTempView("my_table1") + + withTempDirectory("integration_parallel_executions2") { tempDir => + val conf = getConfig(tempDir, infoDate, fatalException = true) + + val exitCode = AppRunner.runPipeline(conf) + + assert(exitCode == 2) + + val table3Path = new Path(tempDir, "table3") + val sink3Path = new Path(tempDir, "sink3") + + val dfOut1 = spark.read.parquet(table3Path.toString) + val dfOut2 = spark.read.parquet(sink3Path.toString) + + val actual1 = dfOut1.toJSON.collect() + val actual2 = dfOut2.toJSON.collect() + + assert(actual1.length == 3) + assert(actual1.exists(_.contains(""""a":"E""""))) + assert(actual2.length == 3) + assert(actual2.exists(_.contains(""""a":"E""""))) + } + + spark.catalog.dropTempView("my_table1") + } + } + + def getConfig(basePath: String, infoDate: LocalDate, fatalException: Boolean = false): Config = { + val configContents = ResourceUtils.getResourceString("/test/config/integration_parallel_execution.conf") + val basePathEscaped = basePath.replace("\\", "\\\\") + + val conf = ConfigFactory.parseString( + s"""base.path = "$basePathEscaped" + |pramen.runtime.is.rerun = true + |pramen.current.date = "$infoDate" + |fatal.exception = $fatalException + |$configContents + |""".stripMargin + ).withFallback(ConfigFactory.load()) + .resolve() + + conf + } + +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/transformer/FailingTransformer.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/transformer/FailingTransformer.scala new file mode 100644 index 000000000..a33dfd57a --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/transformer/FailingTransformer.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.mocks.transformer + +import org.apache.spark.sql.DataFrame +import za.co.absa.pramen.api.{MetastoreReader, Reason, Transformer} + +import java.time.LocalDate + +class FailingTransformer extends Transformer { + override def validate(metastore: MetastoreReader, infoDate: LocalDate, options: Map[String, String]): Reason = { + if (options.contains("fail.validation") && options("fail.validation").toBoolean) { + Reason.NotReady("Validation failed") + } else { + Reason.Ready + } + } + + override def run(metastore: MetastoreReader, infoDate: LocalDate, options: Map[String, String]): DataFrame = { + if (options.contains("fatal.exception") && options("fatal.exception").toBoolean) { + throw new AbstractMethodError("Error in test") + } else { + throw new RuntimeException("Error in test") + } + } +}