Skip to content

Commit

Permalink
fix: ensure a spark application can only be submitted once (#460)
Browse files Browse the repository at this point in the history
* fix: ensure a spark application can only be submitted once

* update changelog

* add doc page for app status

* add callout

* fix typos

* Update rust/operator-binary/src/spark_k8s_controller.rs

Co-authored-by: Sebastian Bernauer <[email protected]>

* Update rust/operator-binary/src/spark_k8s_controller.rs

Co-authored-by: Sebastian Bernauer <[email protected]>

* implement review feedback

* Update rust/operator-binary/src/spark_k8s_controller.rs

Co-authored-by: Sebastian Bernauer <[email protected]>

* Update rust/operator-binary/src/spark_k8s_controller.rs

Co-authored-by: Nick <[email protected]>

---------

Co-authored-by: Sebastian Bernauer <[email protected]>
Co-authored-by: Nick <[email protected]>
  • Loading branch information
3 people authored Sep 16, 2024
1 parent 0a83d7b commit 9cd61dd
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file.
### Fixed

- Fix `envOverrides` for SparkApplication and SparkHistoryServer ([#451]).
- Ensure SparkApplications can only create a single submit Job. Fix for #457 ([#460]).

### Removed

Expand All @@ -24,6 +25,7 @@ All notable changes to this project will be documented in this file.
[#450]: https://github.com/stackabletech/spark-k8s-operator/pull/450
[#451]: https://github.com/stackabletech/spark-k8s-operator/pull/451
[#459]: https://github.com/stackabletech/spark-k8s-operator/pull/459
[#460]: https://github.com/stackabletech/spark-k8s-operator/pull/460

## [24.7.0] - 2024-07-24

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
= Spark Applications

Spark applications are submitted to the Spark Operator as SparkApplication resources. These resources are used to define the configuration of the Spark job, including the image to use, the main application file, and the number of executors to start.

Upon creation, the application's status set to `Unknown`. As the operator creates the necessary resources, the status of the application transitions through different phases that reflect the phase of the driver Pod. A successful application will eventually reach the `Succeeded` phase.

NOTE: The operator will never reconcile an application once it has been created. To resubmit an application, a new SparkApplication resource must be created.
1 change: 1 addition & 0 deletions docs/modules/spark-k8s/partials/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
** xref:spark-k8s:usage-guide/history-server.adoc[]
** xref:spark-k8s:usage-guide/examples.adoc[]
** xref:spark-k8s:usage-guide/operations/index.adoc[]
*** xref:spark-k8s:usage-guide/operations/applications.adoc[]
*** xref:spark-k8s:usage-guide/operations/pod-placement.adoc[]
*** xref:spark-k8s:usage-guide/operations/pod-disruptions.adoc[]
*** xref:spark-k8s:usage-guide/operations/graceful-shutdown.adoc[]
Expand Down
15 changes: 15 additions & 0 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,21 @@ pub struct JobDependencies {
}

impl SparkApplication {
/// Returns if this [`SparkApplication`] has already created a Kubernetes Job doing the actual `spark-submit`.
///
/// This is needed because Kubernetes will remove the succeeded Job after some time. When the spark-k8s-operator is
/// restarted it would re-create the Job, resulting in the Spark job running multiple times. This function assumes
/// that the [`SparkApplication`]'s status will always be set when the Kubernetes Job is created. It therefore
/// checks if the status is set to determine if the Job was already created in the past.
///
/// See the bug report [#457](https://github.com/stackabletech/spark-k8s-operator/issues/457) for details.
pub fn k8s_job_has_been_created(&self) -> bool {
self.status
.as_ref()
.map(|s| !s.phase.is_empty())
.unwrap_or_default()
}

pub fn submit_job_config_map_name(&self) -> String {
format!("{app_name}-submit-job", app_name = self.name_any())
}
Expand Down
32 changes: 31 additions & 1 deletion rust/operator-binary/src/spark_k8s_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use product_config::writer::to_java_properties_string;
use stackable_operator::time::Duration;
use stackable_spark_k8s_crd::{
constants::*, s3logdir::S3LogDir, tlscerts, RoleConfig, SparkApplication, SparkApplicationRole,
SparkContainer, SubmitConfig,
SparkApplicationStatus, SparkContainer, SubmitConfig,
};

use crate::product_logging::{self, resolve_vector_aggregator_address};
Expand Down Expand Up @@ -155,6 +155,12 @@ pub enum Error {
CreateVolumes {
source: stackable_spark_k8s_crd::Error,
},

#[snafu(display("Failed to update status for application {name:?}"))]
ApplySparkApplicationStatus {
source: stackable_operator::client::Error,
name: String,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -170,6 +176,14 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)

let client = &ctx.client;

if spark_application.k8s_job_has_been_created() {
tracing::info!(
spark_application = spark_application.name_any(),
"Skipped reconciling SparkApplication with non empty status"
);
return Ok(Action::await_change());
}

let opt_s3conn = match spark_application.spec.s3connection.as_ref() {
Some(s3bd) => s3bd
.resolve(
Expand Down Expand Up @@ -346,6 +360,22 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
.await
.context(ApplyApplicationSnafu)?;

// Fix for #457
// Update the status of the SparkApplication immediately after creating the Job
// to ensure the Job is not created again after being recycled by Kubernetes.
client
.apply_patch_status(
CONTROLLER_NAME,
spark_application.as_ref(),
&SparkApplicationStatus {
phase: "Unknown".to_string(),
},
)
.await
.with_context(|_| ApplySparkApplicationStatusSnafu {
name: spark_application.name_any(),
})?;

Ok(Action::await_change())
}

Expand Down

0 comments on commit 9cd61dd

Please sign in to comment.