Skip to content

Commit

Permalink
refactor: dynamic properties (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle authored Dec 9, 2024
1 parent bd8e516 commit f930357
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 81 deletions.
68 changes: 28 additions & 40 deletions src/main/java/io/kestra/plugin/spark/AbstractSubmit.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.runners.ScriptService;
Expand Down Expand Up @@ -47,71 +48,58 @@ public abstract class AbstractSubmit extends Task implements RunnableTask<Script
title = "Spark master hostname for the application.",
description = "Spark master URL [formats](https://spark.apache.org/docs/latest/submitting-applications.html#master-urls)."
)
@PluginProperty(dynamic = true)
@NotNull
private String master;
private Property<String> master;

@Schema(
title = "Spark application name."
)
@PluginProperty(dynamic = true)
private String name;
private Property<String> name;

@Schema(
title = "Command line arguments for the application."
)
@PluginProperty(dynamic = true)
private List<String> args;
private Property<List<String>> args;

@Schema(
title = "Adds a file to be submitted with the application.",
description = "Must be an internal storage URI."
)
@PluginProperty(dynamic = true, additionalProperties = String.class)
private Map<String, String> appFiles;
private Property<Map<String, String>> appFiles;

@Schema(
title = "Enables verbose reporting."
)
@PluginProperty
@Builder.Default
private Boolean verbose = false;
private Property<Boolean> verbose = Property.of(false);

@Schema(
title = "Configuration properties for the application."
)
@PluginProperty(dynamic = true, additionalProperties = String.class)
private Map<String, String> configurations;
private Property<Map<String, String>> configurations;


@Schema(
title = "Deploy mode for the application."
)
@PluginProperty(dynamic = true)
private DeployMode deployMode;
private Property<DeployMode> deployMode;

@Schema(
title = "The `spark-submit` binary path."
)
@PluginProperty(dynamic = true)
@Builder.Default
private String sparkSubmitPath = "spark-submit";
private Property<String> sparkSubmitPath = Property.of("spark-submit");

@Schema(
title = "Additional environment variables for the current process."
)
@PluginProperty(
additionalProperties = String.class,
dynamic = true
)
protected Map<String, String> env;
protected Property<Map<String, String>> env;

@Schema(
title = "Script runner to use.",
description = "Deprecated - use 'taskRunner' instead."
)
@PluginProperty
protected RunnerType runner;
protected Property<RunnerType> runner;

@Schema(
title = "Deprecated, use 'taskRunner' instead"
Expand Down Expand Up @@ -140,7 +128,7 @@ protected DockerOptions injectDefaults(DockerOptions original) {
if (original == null) {
return null;
}

var builder = original.toBuilder();
if (original.getImage() == null) {
builder.image(DEFAULT_IMAGE);
Expand All @@ -152,36 +140,37 @@ protected DockerOptions injectDefaults(DockerOptions original) {
@Override
public ScriptOutput run(RunContext runContext) throws Exception {
SparkLauncher spark = new KestraSparkLauncher(this.envs(runContext))
.setMaster(runContext.render(master))
.setVerbose(this.verbose);
.setMaster(runContext.render(master).as(String.class).orElseThrow())
.setVerbose(runContext.render(verbose).as(Boolean.class).orElseThrow());

if (this.name != null) {
spark.setAppName(runContext.render(this.name));
spark.setAppName(runContext.render(this.name).as(String.class).orElseThrow());
}

if (this.configurations != null) {
this.configurations.forEach(throwBiConsumer((key, value) ->
spark.setConf(runContext.render(key), runContext.render(value))
));
runContext.render(this.configurations).asMap(String.class, String.class)
.forEach(throwBiConsumer(spark::setConf));
}

if (this.args != null) {
runContext.render(this.args).forEach(throwConsumer(spark::addAppArgs));
runContext.render(this.args).asMap(String.class, String.class)
.forEach(throwConsumer(spark::addAppArgs));
}

if (this.appFiles != null) {
this.appFiles.forEach(throwBiConsumer((key, value) -> spark.addFile(this.tempFile(runContext, key, value))));
runContext.render(this.appFiles).asMap(String.class, String.class)
.forEach(throwBiConsumer((key, val) -> spark.addFile(this.tempFile(runContext, key, val))));
}

this.configure(runContext, spark);

List<String> commandsArgs = new ArrayList<>();
commandsArgs.add(this.sparkSubmitPath);
commandsArgs.add(runContext.render(this.sparkSubmitPath).as(String.class).orElseThrow());
commandsArgs.addAll(((KestraSparkLauncher) spark).getCommands());

return new CommandsWrapper(runContext)
.withEnv(this.envs(runContext))
.withRunnerType(this.runner)
.withRunnerType(runContext.render(this.runner).as(RunnerType.class).orElseThrow())
.withDockerOptions(injectDefaults(this.getDocker()))
.withTaskRunner(this.taskRunner)
.withContainerImage(this.containerImage)
Expand All @@ -196,20 +185,19 @@ public ScriptOutput run(RunContext runContext) throws Exception {
private Map<String, String> envs(RunContext runContext) throws IllegalVariableEvaluationException {
HashMap<String, String> result = new HashMap<>();

if (this.env != null) {
this.env.forEach(throwBiConsumer((s, s2) -> {
runContext.render(this.env).asMap(String.class, String.class)
.forEach(throwBiConsumer((s, s2) -> {
result.put(runContext.render(s), runContext.render(s2));
}));
}

return result;
}

protected String tempFile(RunContext runContext, String name, String url) throws IOException, IllegalVariableEvaluationException, URISyntaxException {
File file = runContext.workingDir().resolve(Path.of(runContext.render(name))).toFile();
protected String tempFile(RunContext runContext, String name, String url) throws IOException, URISyntaxException {
File file = runContext.workingDir().resolve(Path.of(name)).toFile();

try (FileOutputStream fileOutputStream = new FileOutputStream(file)) {
URI from = new URI(runContext.render(url));
URI from = new URI(url);
IOUtils.copyLarge(runContext.storage().getFile(from), fileOutputStream);

return file.getAbsoluteFile().toString();
Expand Down
19 changes: 8 additions & 11 deletions src/main/java/io/kestra/plugin/spark/JarSubmit.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
Expand Down Expand Up @@ -51,33 +52,29 @@ public class JarSubmit extends AbstractSubmit {
description = "This should be the location of a JAR file for Scala/Java applications, or a Python script for PySpark applications.\n" +
"Must be an internal storage URI."
)
@PluginProperty(dynamic = true)
@NotNull
private String mainResource;
private Property<String> mainResource;

@Schema(
title = "The application class name for Java/Scala applications."
)
@PluginProperty(dynamic = true)
@NotNull
private String mainClass;
private Property<String> mainClass;

@Schema(
title = "Additional JAR files to be submitted with the application.",
description = "Must be an internal storage URI."
)
@PluginProperty(dynamic = true, additionalProperties = String.class)
private Map<String, String> jars;
private Property<Map<String, String>> jars;

@Override
protected void configure(RunContext runContext, SparkLauncher spark) throws Exception {
String appJar = this.tempFile(runContext, "app.jar", this.mainResource);
String appJar = this.tempFile(runContext, "app.jar", runContext.render(this.mainResource).as(String.class).orElseThrow());
spark.setAppResource("file://" + appJar);

spark.setMainClass(runContext.render(mainClass));
spark.setMainClass(runContext.render(mainClass).as(String.class).orElseThrow());

if (this.jars != null) {
this.jars.forEach(throwBiConsumer((key, value) -> spark.addJar(this.tempFile(runContext, key, value))));
}
runContext.render(jars).asMap(String.class, String.class)
.forEach(throwBiConsumer((key, value) -> spark.addJar(this.tempFile(runContext, key, value))));
}
}
29 changes: 14 additions & 15 deletions src/main/java/io/kestra/plugin/spark/PythonSubmit.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -51,26 +52,26 @@
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("PythonPi") \
.getOrCreate()
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_: int) -> float:
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
spark.stop()
spark.stop()
"""
)
}
Expand All @@ -79,29 +80,27 @@ public class PythonSubmit extends AbstractSubmit {
@Schema(
title = "The main Python script."
)
@PluginProperty(dynamic = true)
@NotNull
private String mainScript;
private Property<String> mainScript;

@Schema(
title = "Adds a Python file/zip/egg package to be submitted with the application.",
description = "Must be an internal storage URI."
)
@PluginProperty(dynamic = true, additionalProperties = String.class)
private Map<String, String> pythonFiles;
private Property<Map<String, String>> pythonFiles;

@Override
protected void configure(RunContext runContext, SparkLauncher spark) throws Exception {
Path path = runContext.workingDir().createTempFile(".py");
try (FileWriter fileWriter = new FileWriter(path.toFile())) {
IOUtils.write(runContext.render(this.mainScript), fileWriter);
IOUtils.write(runContext.render(this.mainScript).as(String.class).orElseThrow(), fileWriter);
fileWriter.flush();
}

spark.setAppResource("file://" + path.toFile().getAbsolutePath());

if (this.pythonFiles != null) {
this.pythonFiles.forEach(throwBiConsumer((key, value) -> spark.addPyFile(this.tempFile(runContext, key, value))));
}
runContext.render(this.pythonFiles).asMap(String.class, String.class)
.forEach(throwBiConsumer((key, value) -> spark.addPyFile(this.tempFile(runContext, key, value))));

}
}
10 changes: 5 additions & 5 deletions src/main/java/io/kestra/plugin/spark/RSubmit.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -44,9 +45,9 @@
mainScript: |
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session()
print("The SparkR session has initialized successfully.")
sparkR.stop()
"""
)
Expand All @@ -56,16 +57,15 @@ public class RSubmit extends AbstractSubmit {
@Schema(
title = "The main R script."
)
@PluginProperty(dynamic = true)
@NotNull
private String mainScript;
private Property<String> mainScript;


@Override
protected void configure(RunContext runContext, SparkLauncher spark) throws Exception {
Path path = runContext.workingDir().createTempFile(".R");
try (FileWriter fileWriter = new FileWriter(path.toFile())) {
IOUtils.write(runContext.render(this.mainScript), fileWriter);
IOUtils.write(runContext.render(this.mainScript).as(String.class).orElseThrow(), fileWriter);
fileWriter.flush();
}

Expand Down
9 changes: 5 additions & 4 deletions src/test/java/io/kestra/plugin/spark/JarSubmitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.property.Property;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -54,17 +55,17 @@ void jar() throws Exception {
JarSubmit task = JarSubmit.builder()
.id("unit-test")
.type(JarSubmit.class.getName())
.master("spark://localhost:37077")
.runner(RunnerType.DOCKER)
.master(Property.of("spark://localhost:37077"))
.runner(Property.of(RunnerType.DOCKER))
.docker(DockerOptions.builder()
.image("bitnami/spark:3.4.1")
.entryPoint(List.of(""))
.networkMode("host")
.user("root")
.build()
)
.mainClass("spark.samples.App")
.mainResource(put.toString())
.mainClass(Property.of("spark.samples.App"))
.mainResource(Property.of(put.toString()))
.build();

RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
Expand Down
Loading

0 comments on commit f930357

Please sign in to comment.