Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: migrate to dynamic properties #84

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions src/main/java/io/kestra/plugin/singer/AbstractPythonSinger.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.runners.AbstractLogConsumer;
import io.kestra.core.models.tasks.runners.ScriptService;
Expand Down Expand Up @@ -65,22 +66,19 @@ public abstract class AbstractPythonSinger extends Task {
@Schema(
title = "The name of Singer state file stored in KV Store."
)
@PluginProperty(dynamic = true)
@NotNull
@Builder.Default
protected String stateName = "singer-state";
protected Property<String> stateName = Property.of("singer-state");

@Schema(
title = "Override default pip packages to use a specific version."
)
@PluginProperty(dynamic = true)
protected List<String> pipPackages;
protected Property<List<String>> pipPackages;

@Schema(
title = "Override default singer command."
)
@PluginProperty(dynamic = true)
protected String command;
protected Property<String> command;

@Schema(
title = "Deprecated, use 'taskRunner' instead"
Expand All @@ -98,9 +96,8 @@ public abstract class AbstractPythonSinger extends Task {
private TaskRunner taskRunner = Docker.instance();

@Schema(title = "The task runner container image, only used if the task runner is container-based.")
@PluginProperty(dynamic = true)
@Builder.Default
private String containerImage = DEFAULT_IMAGE;
private Property<String> containerImage = Property.of(DEFAULT_IMAGE);

protected DockerOptions injectDefaults(DockerOptions original) {
if (original == null) {
Expand All @@ -115,14 +112,16 @@ protected DockerOptions injectDefaults(DockerOptions original) {
return builder.build();
}

abstract public Map<String, Object> configuration(RunContext runContext) throws IllegalVariableEvaluationException, IOException;
public abstract Map<String, Object> configuration(RunContext runContext) throws IllegalVariableEvaluationException, IOException;

abstract public List<String> pipPackages();
public abstract Property<List<String>> pipPackages();

abstract protected String command();
protected abstract Property<String> command();

protected String finalCommand(RunContext runContext) throws IllegalVariableEvaluationException {
return this.command != null ? runContext.render(this.command) : this.command();
return this.command != null ?
runContext.render(this.command).as(String.class).orElseThrow() :
runContext.render(this.command()).as(String.class).orElse(null);
}

protected void run(RunContext runContext, String command, AbstractLogConsumer logConsumer) throws Exception {
Expand All @@ -141,7 +140,7 @@ protected void run(RunContext runContext, String command, AbstractLogConsumer lo
.withWarningOnStdErr(true)
.withDockerOptions(this.injectDefaults(getDocker()))
.withTaskRunner(taskRunner)
.withContainerImage(this.containerImage)
.withContainerImage(runContext.render(this.containerImage).as(String.class).orElseThrow())
.withLogConsumer(logConsumer)
.withCommands(ScriptService.scriptCommands(
List.of("/bin/sh", "-c"),
Expand All @@ -156,7 +155,11 @@ protected void run(RunContext runContext, String command, AbstractLogConsumer lo
}

protected Stream<String> pipInstallCommands(RunContext runContext) throws Exception {
ArrayList<String> finalRequirements = new ArrayList<>(this.pipPackages != null ? runContext.render(this.pipPackages) : this.pipPackages());
ArrayList<String> finalRequirements = new ArrayList<>(
this.pipPackages != null ?
runContext.render(this.pipPackages).asList(String.class) :
runContext.render(this.pipPackages()).asList(String.class)
);
finalRequirements.add("python-json-logger");

return Stream.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void initEnvDiscoveryAndState(RunContext runContext) throws Exception {
if (this.features().contains(Feature.STATE)) {
try {
InputStream taskStateFile = runContext.stateStore().getState(
runContext.render(this.stateName),
runContext.render(this.stateName).as(String.class).orElseThrow(),
"state.json",
runContext.storage().getTaskStorageContext().map(StorageContext.Task::getTaskRunValue).orElse(null)
);
Expand Down Expand Up @@ -102,7 +102,7 @@ public Output run(RunContext runContext) throws Exception {
.raw(runContext.storage().putFile(this.rawSingerStream.getLeft()));

if (this.features().contains(Feature.STATE)) {
this.saveState(runContext, runContext.render(this.stateName), this.stateRecords);
this.saveState(runContext, runContext.render(this.stateName).as(String.class).orElseThrow(), this.stateRecords);
}

return outputBuilder
Expand Down
22 changes: 10 additions & 12 deletions src/main/java/io/kestra/plugin/singer/taps/BigQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.singer.models.Feature;
Expand All @@ -30,8 +31,7 @@ public class BigQuery extends AbstractPythonTap implements RunnableTask<Abstract
@Schema(
title = "The JSON service account key as string."
)
@PluginProperty(dynamic = true)
protected String serviceAccount;
protected Property<String> serviceAccount;

@NotNull
@NotEmpty
Expand All @@ -45,17 +45,15 @@ public class BigQuery extends AbstractPythonTap implements RunnableTask<Abstract
@Schema(
title = "Limits the number of records returned in each stream, applied as a limit in the query."
)
@PluginProperty
private Integer limit;
private Property<Integer> limit;

@NotNull
@Schema(
title = "When replicating incrementally, disable to only select records whose `datetime_key` is greater than the maximum value replicated in the last run, by excluding records whose timestamps match exactly.",
description = "This could cause records to be missed that were created after the last run finished, but during the same second and with the same timestamp."
)
@PluginProperty
@Builder.Default
private Boolean startAlwaysInclusive = true;
private Property<Boolean> startAlwaysInclusive = Property.of(true);

@NotNull
@Schema(
Expand Down Expand Up @@ -83,7 +81,7 @@ public List<Feature> features() {
public Map<String, Object> configuration(RunContext runContext) throws IllegalVariableEvaluationException {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
.put("streams", this.streams)
.put("start_always_inclusive", this.startAlwaysInclusive);
.put("start_always_inclusive", runContext.render(this.startAlwaysInclusive).as(Boolean.class).orElseThrow());

if (this.startDateTime != null) {
builder.put("start_datetime", runContext.render(this.startDateTime.toString()));
Expand All @@ -97,13 +95,13 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
}

@Override
public List<String> pipPackages() {
return List.of("git+https://github.com/kestra-io/tap-bigquery.git@fix");
public Property<List<String>> pipPackages() {
return Property.of(List.of("git+https://github.com/kestra-io/tap-bigquery.git@fix"));
}

@Override
protected String command() {
return "tap-bigquery";
protected Property<String> command() {
return Property.of("tap-bigquery");
}

@SuppressWarnings("DuplicatedCode")
Expand All @@ -112,7 +110,7 @@ protected Map<String, String> environmentVariables(RunContext runContext) throws
HashMap<String, String> env = new HashMap<>(super.environmentVariables(runContext));

if (this.serviceAccount != null) {
this.writeSingerFiles("google-credentials.json", runContext.render(this.serviceAccount));
this.writeSingerFiles("google-credentials.json", runContext.render(this.serviceAccount).as(String.class).orElseThrow());
env.put("GOOGLE_APPLICATION_CREDENTIALS", workingDirectory.toAbsolutePath() + "/google-credentials.json");
}

Expand Down
9 changes: 5 additions & 4 deletions src/main/java/io/kestra/plugin/singer/taps/BingAds.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.singer.models.Feature;
Expand Down Expand Up @@ -107,12 +108,12 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
}

@Override
public List<String> pipPackages() {
return Collections.singletonList("tap-bing-ads");
public Property<List<String>> pipPackages() {
return Property.of(Collections.singletonList("tap-bing-ads"));
}

@Override
protected String command() {
return "tap-bing-ads";
protected Property<String> command() {
return Property.of("tap-bing-ads");
}
}
9 changes: 5 additions & 4 deletions src/main/java/io/kestra/plugin/singer/taps/ChargeBee.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.singer.models.Feature;
Expand Down Expand Up @@ -84,12 +85,12 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
}

@Override
public List<String> pipPackages() {
return Collections.singletonList("git+https://github.com/hotgluexyz/tap-chargebee.git");
public Property<List<String>> pipPackages() {
return Property.of(Collections.singletonList("git+https://github.com/hotgluexyz/tap-chargebee.git"));
}

@Override
protected String command() {
return "tap-chargebee";
protected Property<String> command() {
return Property.of("tap-chargebee");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.singer.models.Feature;
Expand Down Expand Up @@ -69,12 +70,12 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
}

@Override
public List<String> pipPackages() {
return Collections.singletonList("tap-exchangeratehost");
public Property<List<String>> pipPackages() {
return Property.of(Collections.singletonList("tap-exchangeratehost"));
}

@Override
protected String command() {
return "tap-exchangeratehost";
protected Property<String> command() {
return Property.of("tap-exchangeratehost");
}
}
14 changes: 7 additions & 7 deletions src/main/java/io/kestra/plugin/singer/taps/FacebookAds.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.singer.models.Feature;
Expand Down Expand Up @@ -48,9 +49,8 @@ public class FacebookAds extends AbstractPythonTap implements RunnableTask<Abstr
@Schema(
title = "How many Days before the Start Date to fetch Ads Insights for."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Integer insightsBufferDays = 0;
private final Property<Integer> insightsBufferDays = Property.of(0);

@NotNull
@Schema(
Expand Down Expand Up @@ -79,7 +79,7 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
.put("account_id", runContext.render(this.accountId))
.put("access_token", runContext.render(this.accessToken))
.put("insights_buffer_days", this.insightsBufferDays)
.put("insights_buffer_days", runContext.render(this.insightsBufferDays).as(Integer.class).orElseThrow())
.put("start_date", runContext.render(this.startDate.toString()));

if (this.endDate != null) {
Expand All @@ -90,12 +90,12 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
}

@Override
public List<String> pipPackages() {
return Collections.singletonList("tap-facebook");
public Property<List<String>> pipPackages() {
return Property.of(Collections.singletonList("tap-facebook"));
}

@Override
protected String command() {
return "tap-facebook";
protected Property<String> command() {
return Property.of("tap-facebook");
}
}
9 changes: 5 additions & 4 deletions src/main/java/io/kestra/plugin/singer/taps/Fastly.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.singer.models.Feature;
Expand Down Expand Up @@ -65,12 +66,12 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
}

@Override
public List<String> pipPackages() {
return Collections.singletonList("git+https://gitlab.com/meltano/tap-fastly.git");
public Property<List<String>> pipPackages() {
return Property.of(Collections.singletonList("git+https://gitlab.com/meltano/tap-fastly.git"));
}

@Override
protected String command() {
return "tap-fastly";
protected Property<String> command() {
return Property.of("tap-fastly");
}
}
18 changes: 9 additions & 9 deletions src/main/java/io/kestra/plugin/singer/taps/GenericTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.singer.models.Feature;
Expand All @@ -11,6 +12,7 @@
import lombok.experimental.SuperBuilder;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -27,15 +29,13 @@ public class GenericTap extends AbstractPythonTap implements RunnableTask<Abstra
@Schema(
title = "The list of pip package to install."
)
@PluginProperty
private List<String> pipPackages;
private Property<List<String>> pipPackages;

@NotNull
@Schema(
title = "The command to start."
)
@PluginProperty
private String command;
private Property<String> command;

@NotNull
@Schema(
Expand All @@ -53,25 +53,25 @@ public class GenericTap extends AbstractPythonTap implements RunnableTask<Abstra
title = "The configuration to use",
description = "Will be save on config.json and used as arguments"
)
@PluginProperty(dynamic = true)
private Map<String, Object> configs;
private Property<Map<String, Object>> configs;

public List<Feature> features() {
return this.features;
}

@Override
public Map<String, Object> configuration(RunContext runContext) throws IllegalVariableEvaluationException {
return runContext.render(configs);
var config = runContext.render(configs).asMap(String.class, Object.class);
return config.isEmpty() ? new HashMap<>() : config;
}

@Override
public List<String> pipPackages() {
public Property<List<String>> pipPackages() {
return this.pipPackages;
}

@Override
protected String command() {
protected Property<String> command() {
return this.command;
}
}
Loading
Loading