From 0ca259042af7f3dc3779512bc0710306e9edc3c7 Mon Sep 17 00:00:00 2001 From: Hongxin Liang Date: Thu, 28 Sep 2023 18:55:17 +0200 Subject: [PATCH 1/2] Shutdown client properly Signed-off-by: Hongxin Liang --- .../flyte/jflyte/ExecuteDynamicWorkflow.java | 80 ++++++++++--------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java index 6b0079846..cae1b1e07 100644 --- a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java +++ b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java @@ -203,44 +203,48 @@ static DynamicJobSpec rewrite( Map taskTemplates, Map workflowTemplates) { - WorkflowNodeVisitor workflowNodeVisitor = - IdentifierRewrite.builder() - .domain(executionConfig.domain()) - .project(executionConfig.project()) - .version(executionConfig.version()) - .adminClient( - FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), null)) - .build() - .visitor(); - - List rewrittenNodes = - spec.nodes().stream().map(workflowNodeVisitor::visitNode).collect(toUnmodifiableList()); - - Map usedSubWorkflows = - ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates); - - Map usedTaskTemplates = - ProjectClosure.collectTasks(rewrittenNodes, taskTemplates); - - // FIXME one sub-workflow can use more sub-workflows, we should recursively collect used tasks - // and workflows - - Map rewrittenUsedSubWorkflows = - mapValues(usedSubWorkflows, workflowNodeVisitor::visitWorkflowTemplate); - - return spec.toBuilder() - .nodes(rewrittenNodes) - .subWorkflows( - ImmutableMap.builder() - .putAll(spec.subWorkflows()) - .putAll(rewrittenUsedSubWorkflows) - .build()) - .tasks( - ImmutableMap.builder() - .putAll(spec.tasks()) - .putAll(usedTaskTemplates) - .build()) - .build(); + try (FlyteAdminClient flyteAdminClient = + FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), null)) { + + WorkflowNodeVisitor workflowNodeVisitor = + IdentifierRewrite.builder() + .domain(executionConfig.domain()) + .project(executionConfig.project()) + .version(executionConfig.version()) + .adminClient( + FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), null)) + .build() + .visitor(); + + List rewrittenNodes = + spec.nodes().stream().map(workflowNodeVisitor::visitNode).collect(toUnmodifiableList()); + + Map usedSubWorkflows = + ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates); + + Map usedTaskTemplates = + ProjectClosure.collectTasks(rewrittenNodes, taskTemplates); + + // FIXME one sub-workflow can use more sub-workflows, we should recursively collect used tasks + // and workflows + + Map rewrittenUsedSubWorkflows = + mapValues(usedSubWorkflows, workflowNodeVisitor::visitWorkflowTemplate); + + return spec.toBuilder() + .nodes(rewrittenNodes) + .subWorkflows( + ImmutableMap.builder() + .putAll(spec.subWorkflows()) + .putAll(rewrittenUsedSubWorkflows) + .build()) + .tasks( + ImmutableMap.builder() + .putAll(spec.tasks()) + .putAll(usedTaskTemplates) + .build()) + .build(); + } } private static DynamicWorkflowTask getDynamicWorkflowTask(String name) { From b481159d33e02ad66e08827aa910d351eed605ae Mon Sep 17 00:00:00 2001 From: Hongxin Liang Date: Thu, 28 Sep 2023 18:57:49 +0200 Subject: [PATCH 2/2] Copy pasta Signed-off-by: Hongxin Liang --- .../src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java index cae1b1e07..21cb3338e 100644 --- a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java +++ b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java @@ -211,8 +211,7 @@ static DynamicJobSpec rewrite( .domain(executionConfig.domain()) .project(executionConfig.project()) .version(executionConfig.version()) - .adminClient( - FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), null)) + .adminClient(flyteAdminClient) .build() .visitor();