Skip to content

Commit

Permalink
fix: property v2 for duration in Azure Data Factory
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Oct 23, 2024
1 parent 02ec3f3 commit ae016d0
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.property.Property;
Expand Down Expand Up @@ -116,17 +115,15 @@ public class CreateRun extends AbstractAzureIdentityConnection implements Runnab
title = "Wait until completion duration",
description = "Maximum duration to wait for the pipeline to resolve. After this time the task will time out"
)
@PluginProperty
@Builder.Default
private Duration waitUntilCompletion = Duration.ofHours(1L);
private Property<Duration> waitUntilCompletion = Property.of(Duration.ofHours(1L));

@Schema(
title = "Completion check interval",
description = "The frequency with which the task checks whether the pipeline has completed."
)
@Builder.Default
@PluginProperty
private final Duration completionCheckInterval = Duration.ofSeconds(5L);
private final Property<Duration> completionCheckInterval = Property.of(Duration.ofSeconds(5L));

@Override
public CreateRun.Output run(RunContext runContext) throws Exception {
Expand Down Expand Up @@ -166,6 +163,8 @@ public CreateRun.Output run(RunContext runContext) throws Exception {
.runId(runId)
.build();
}
final Duration completionCheckIntervalRendered = runContext.render(completionCheckInterval, Duration.class);
final Duration waitUntilCompletionRendered = runContext.render(waitUntilCompletion, Duration.class);

final AtomicReference<PipelineRun> runningPipelineResponse = new AtomicReference<>();
try {
Expand All @@ -178,7 +177,7 @@ public CreateRun.Output run(RunContext runContext) throws Exception {
}

return PIPELINE_SUCCEEDED_STATUS.equals(runStatus);
}, completionCheckInterval, waitUntilCompletion);
}, completionCheckIntervalRendered, waitUntilCompletionRendered);
} catch (TimeoutException | RuntimeException e) {
logger.error("Pipeline '{}' with runId '{} finished with status '{}'", pipelineName, runId, runningPipelineResponse.get().status());
throw new RuntimeException(runningPipelineResponse.get().message());
Expand Down

0 comments on commit ae016d0

Please sign in to comment.