diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b9f308c..b1171d3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file. ### Fixed - Fix `envOverrides` for SparkApplication and SparkHistoryServer ([#451]). +- Ensure SparkApplications can only create a single submit Job. Fix for #457 ([#460]). ### Removed @@ -24,6 +25,7 @@ All notable changes to this project will be documented in this file. [#450]: https://github.com/stackabletech/spark-k8s-operator/pull/450 [#451]: https://github.com/stackabletech/spark-k8s-operator/pull/451 [#459]: https://github.com/stackabletech/spark-k8s-operator/pull/459 +[#460]: https://github.com/stackabletech/spark-k8s-operator/pull/460 ## [24.7.0] - 2024-07-24 diff --git a/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc b/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc new file mode 100644 index 00000000..ab38f728 --- /dev/null +++ b/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc @@ -0,0 +1,7 @@ += Spark Applications + +Spark applications are submitted to the Spark Operator as SparkApplication resources. These resources are used to define the configuration of the Spark job, including the image to use, the main application file, and the number of executors to start. + +Upon creation, the application's status set to `Unknown`. As the operator creates the necessary resources, the status of the application transitions through different phases that reflect the phase of the driver Pod. A successful application will eventually reach the `Succeeded` phase. + +NOTE: The operator will never reconcile an application once it has been created. To resubmit an application, a new SparkApplication resource must be created. diff --git a/docs/modules/spark-k8s/partials/nav.adoc b/docs/modules/spark-k8s/partials/nav.adoc index 2fb175f1..a514d14d 100644 --- a/docs/modules/spark-k8s/partials/nav.adoc +++ b/docs/modules/spark-k8s/partials/nav.adoc @@ -11,6 +11,7 @@ ** xref:spark-k8s:usage-guide/history-server.adoc[] ** xref:spark-k8s:usage-guide/examples.adoc[] ** xref:spark-k8s:usage-guide/operations/index.adoc[] +*** xref:spark-k8s:usage-guide/operations/applications.adoc[] *** xref:spark-k8s:usage-guide/operations/pod-placement.adoc[] *** xref:spark-k8s:usage-guide/operations/pod-disruptions.adoc[] *** xref:spark-k8s:usage-guide/operations/graceful-shutdown.adoc[] diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 0bdfc051..8d195f74 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -230,6 +230,21 @@ pub struct JobDependencies { } impl SparkApplication { + /// Returns if this [`SparkApplication`] has already created a Kubernetes Job doing the actual `spark-submit`. + /// + /// This is needed because Kubernetes will remove the succeeded Job after some time. When the spark-k8s-operator is + /// restarted it would re-create the Job, resulting in the Spark job running multiple times. This function assumes + /// that the [`SparkApplication`]'s status will always be set when the Kubernetes Job is created. It therefore + /// checks if the status is set to determine if the Job was already created in the past. + /// + /// See the bug report [#457](https://github.com/stackabletech/spark-k8s-operator/issues/457) for details. + pub fn k8s_job_has_been_created(&self) -> bool { + self.status + .as_ref() + .map(|s| !s.phase.is_empty()) + .unwrap_or_default() + } + pub fn submit_job_config_map_name(&self) -> String { format!("{app_name}-submit-job", app_name = self.name_any()) } diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index e9a794b5..a890a6c9 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -10,7 +10,7 @@ use product_config::writer::to_java_properties_string; use stackable_operator::time::Duration; use stackable_spark_k8s_crd::{ constants::*, s3logdir::S3LogDir, tlscerts, RoleConfig, SparkApplication, SparkApplicationRole, - SparkContainer, SubmitConfig, + SparkApplicationStatus, SparkContainer, SubmitConfig, }; use crate::product_logging::{self, resolve_vector_aggregator_address}; @@ -155,6 +155,12 @@ pub enum Error { CreateVolumes { source: stackable_spark_k8s_crd::Error, }, + + #[snafu(display("Failed to update status for application {name:?}"))] + ApplySparkApplicationStatus { + source: stackable_operator::client::Error, + name: String, + }, } type Result = std::result::Result; @@ -170,6 +176,14 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) let client = &ctx.client; + if spark_application.k8s_job_has_been_created() { + tracing::info!( + spark_application = spark_application.name_any(), + "Skipped reconciling SparkApplication with non empty status" + ); + return Ok(Action::await_change()); + } + let opt_s3conn = match spark_application.spec.s3connection.as_ref() { Some(s3bd) => s3bd .resolve( @@ -346,6 +360,22 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) .await .context(ApplyApplicationSnafu)?; + // Fix for #457 + // Update the status of the SparkApplication immediately after creating the Job + // to ensure the Job is not created again after being recycled by Kubernetes. + client + .apply_patch_status( + CONTROLLER_NAME, + spark_application.as_ref(), + &SparkApplicationStatus { + phase: "Unknown".to_string(), + }, + ) + .await + .with_context(|_| ApplySparkApplicationStatusSnafu { + name: spark_application.name_any(), + })?; + Ok(Action::await_change()) }