Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure a spark application can only be submitted once #460

Merged
merged 12 commits into from
Sep 16, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file.
### Fixed

- Fix `envOverrides` for SparkApplication and SparkHistoryServer ([#451]).
- Ensure SparkApplications can only create a single submit Job. Fix for #457 ([#460]).

### Removed

Expand All @@ -24,6 +25,7 @@ All notable changes to this project will be documented in this file.
[#450]: https://github.com/stackabletech/spark-k8s-operator/pull/450
[#451]: https://github.com/stackabletech/spark-k8s-operator/pull/451
[#459]: https://github.com/stackabletech/spark-k8s-operator/pull/459
[#460]: https://github.com/stackabletech/spark-k8s-operator/pull/460

## [24.7.0] - 2024-07-24

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
= Spark Applications

Spark applications are submitted to the Spark Operator as SparkApplication resources. These resources are used to define the configuration of the Spark job, including the image to use, the main application file, and the number of executors to start.

Upon creation, the application's status set to `Unknown`. As the operator creates the necessary resources, the status of the application transitions through different phases that reflect the phase of the driver Pod. A successful application will eventually reach the `Succeeded` phase.

NOTE: The operator will never reconcile an application once it has been created. To resubmit an application, a new SparkApplication resource must be created.
1 change: 1 addition & 0 deletions docs/modules/spark-k8s/partials/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
** xref:spark-k8s:usage-guide/history-server.adoc[]
** xref:spark-k8s:usage-guide/examples.adoc[]
** xref:spark-k8s:usage-guide/operations/index.adoc[]
*** xref:spark-k8s:usage-guide/operations/applications.adoc[]
*** xref:spark-k8s:usage-guide/operations/pod-placement.adoc[]
*** xref:spark-k8s:usage-guide/operations/pod-disruptions.adoc[]
*** xref:spark-k8s:usage-guide/operations/graceful-shutdown.adoc[]
Expand Down
15 changes: 15 additions & 0 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,21 @@ pub struct JobDependencies {
}

impl SparkApplication {
/// Returns if this [`SparkApplication`] has already created a Kubernetes Job doing the actual `spark-submit`.
///
/// This is needed because Kubernetes will remove the succeeded Job after some time. When the spark-k8s-operator is
/// restarted it would re-create the Job, resulting in the Spark job running multiple times. This function assumes
/// that the [`SparkApplication`]'s status will always be set when the Kubernetes Job is created. It therefore
/// checks if the status is set to determine if the Job was already created in the past.
///
/// See the bug report [#457](https://github.com/stackabletech/spark-k8s-operator/issues/457) for details.
pub fn k8s_job_has_been_created(&self) -> bool {
self.status
.as_ref()
.map(|s| !s.phase.is_empty())
.unwrap_or_default()
}

pub fn submit_job_config_map_name(&self) -> String {
format!("{app_name}-submit-job", app_name = self.name_any())
}
Expand Down
32 changes: 31 additions & 1 deletion rust/operator-binary/src/spark_k8s_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use product_config::writer::to_java_properties_string;
use stackable_operator::time::Duration;
use stackable_spark_k8s_crd::{
constants::*, s3logdir::S3LogDir, tlscerts, RoleConfig, SparkApplication, SparkApplicationRole,
SparkContainer, SubmitConfig,
SparkApplicationStatus, SparkContainer, SubmitConfig,
};

use crate::product_logging::{self, resolve_vector_aggregator_address};
Expand Down Expand Up @@ -155,6 +155,12 @@ pub enum Error {
CreateVolumes {
source: stackable_spark_k8s_crd::Error,
},

#[snafu(display("Failed to update status for application [{name}]"))]
razvan marked this conversation as resolved.
Show resolved Hide resolved
razvan marked this conversation as resolved.
Show resolved Hide resolved
ApplySparkApplicationStatus {
source: stackable_operator::client::Error,
name: String,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -170,6 +176,14 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)

let client = &ctx.client;

if spark_application.k8s_job_has_been_created() {
tracing::info!(
spark_application = spark_application.name_any(),
"Skipped reconciling SparkApplication with non empty status"
);
return Ok(Action::await_change());
}

let opt_s3conn = match spark_application.spec.s3connection.as_ref() {
Some(s3bd) => s3bd
.resolve(
Expand Down Expand Up @@ -346,6 +360,22 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
.await
.context(ApplyApplicationSnafu)?;

// Fix for #457
// Update the status of the SparkApplication immediately after creating the Job
// to ensure the Job is not created again after being recycled by Kubernetes.
client
.apply_patch_status(
CONTROLLER_NAME,
spark_application.as_ref(),
&SparkApplicationStatus {
phase: "Unknown".to_string(),
},
)
.await
.with_context(|_| ApplySparkApplicationStatusSnafu {
name: spark_application.name_any(),
})?;

Ok(Action::await_change())
}

Expand Down
Loading