Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure a spark application can only be submitted once #460

Merged
merged 12 commits into from
Sep 16, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file.
### Fixed

- Fix `envOverrides` for SparkApplication and SparkHistoryServer ([#451]).
- Ensure SparkApplications can only create a single submit Job. Fix for #457 ([#460]).

### Removed

Expand All @@ -24,6 +25,7 @@ All notable changes to this project will be documented in this file.
[#450]: https://github.com/stackabletech/spark-k8s-operator/pull/450
[#451]: https://github.com/stackabletech/spark-k8s-operator/pull/451
[#459]: https://github.com/stackabletech/spark-k8s-operator/pull/459
[#460]: https://github.com/stackabletech/spark-k8s-operator/pull/460

## [24.7.0] - 2024-07-24

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
= Spark Applications

Spark applications are submitted to the Spark Operator as SparkApplication resources. These resources are used to define the configuration of the Spark job, including the image to use, the main application file, and the number of executors to start.

Upon creation, the application's status set to `Unknown`. As the operator creates the necessary resources, the status of the application transitions through different phases that reflect the phase of the driver Pod. A successful application will eventually reach the `Succeeded` phase.

NOTE: The operator will never reconcile an application once it has been created. To resubmit an application, a new SparkApplication resource must be created.
1 change: 1 addition & 0 deletions docs/modules/spark-k8s/partials/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
** xref:spark-k8s:usage-guide/history-server.adoc[]
** xref:spark-k8s:usage-guide/examples.adoc[]
** xref:spark-k8s:usage-guide/operations/index.adoc[]
*** xref:spark-k8s:usage-guide/operations/applications.adoc[]
*** xref:spark-k8s:usage-guide/operations/pod-placement.adoc[]
*** xref:spark-k8s:usage-guide/operations/pod-disruptions.adoc[]
*** xref:spark-k8s:usage-guide/operations/graceful-shutdown.adoc[]
Expand Down
33 changes: 32 additions & 1 deletion rust/operator-binary/src/spark_k8s_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use product_config::writer::to_java_properties_string;
use stackable_operator::time::Duration;
use stackable_spark_k8s_crd::{
constants::*, s3logdir::S3LogDir, tlscerts, RoleConfig, SparkApplication, SparkApplicationRole,
SparkContainer, SubmitConfig,
SparkApplicationStatus, SparkContainer, SubmitConfig,
};

use crate::product_logging::{self, resolve_vector_aggregator_address};
Expand Down Expand Up @@ -155,6 +155,11 @@ pub enum Error {
CreateVolumes {
source: stackable_spark_k8s_crd::Error,
},
#[snafu(display("Failed to update status for application [{name}]"))]
razvan marked this conversation as resolved.
Show resolved Hide resolved
razvan marked this conversation as resolved.
Show resolved Hide resolved
ApplySparkApplicationStatus {
source: stackable_operator::client::Error,
name: String,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -170,6 +175,16 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)

let client = &ctx.client;

// Fix for #457
// Skip reconcyling the SparkApplication if it has a non empty status.
razvan marked this conversation as resolved.
Show resolved Hide resolved
if spark_application.status.is_some() {
sbernauer marked this conversation as resolved.
Show resolved Hide resolved
tracing::info!(
"Skip reconciling SparkApplication [{}] with non empty status",
spark_application.name_any()
);
razvan marked this conversation as resolved.
Show resolved Hide resolved
return Ok(Action::await_change());
}

let opt_s3conn = match spark_application.spec.s3connection.as_ref() {
Some(s3bd) => s3bd
.resolve(
Expand Down Expand Up @@ -346,6 +361,22 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
.await
.context(ApplyApplicationSnafu)?;

// Fix for #457
// Update the status of the SparkApplication immediately after creating the Job
// to ensure the Job is not created again after being recycled by Kubernetes.
client
.apply_patch_status(
CONTROLLER_NAME,
spark_application.as_ref(),
&SparkApplicationStatus {
phase: "Unknown".to_string(),
},
)
.await
.with_context(|_| ApplySparkApplicationStatusSnafu {
name: spark_application.name_any(),
})?;

Ok(Action::await_change())
}

Expand Down
Loading