Skip to content

Commit

Permalink
fix: property v2 for duration in Azure Data Factory
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Oct 23, 2024
1 parent 5b8a385 commit 088b9c2
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ public class CreateRun extends AbstractAzureIdentityConnection implements Runnab
@Builder.Default
private Property<Boolean> wait = Property.of(Boolean.TRUE);

@Schema(
title = "Wait until completion duration",
description = "Maximum duration to wait for the pipeline to resolve. After this time the task will time out"
)
@Builder.Default
private Property<Duration> waitUntilCompletion = Property.of(Duration.ofHours(1L));

@Schema(
title = "Completion check interval",
description = "The frequency with which the task checks whether the pipeline has completed."
)
@Builder.Default
private final Property<Duration> completionCheckInterval = Property.of(Duration.ofSeconds(5L));

@Override
public CreateRun.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
Expand Down Expand Up @@ -151,6 +165,8 @@ public CreateRun.Output run(RunContext runContext) throws Exception {
.runId(runId)
.build();
}
final Duration completionCheckIntervalRendered = runContext.render(completionCheckInterval, Duration.class);
final Duration waitUntilCompletionRendered = runContext.render(waitUntilCompletion, Duration.class);

final AtomicReference<PipelineRun> runningPipelineResponse = new AtomicReference<>();
try {
Expand All @@ -163,7 +179,7 @@ public CreateRun.Output run(RunContext runContext) throws Exception {
}

return PIPELINE_SUCCEEDED_STATUS.equals(runStatus);
}, COMPLETION_CHECK_INTERVAL, WAIT_UNTIL_COMPLETION);
}, completionCheckIntervalRendered, waitUntilCompletionRendered);
} catch (TimeoutException | RuntimeException e) {
logger.error("Pipeline '{}' with runId '{} finished with status '{}'", pipelineName, runId, runningPipelineResponse.get().status());
throw new RuntimeException(runningPipelineResponse.get().message());
Expand Down

0 comments on commit 088b9c2

Please sign in to comment.