Skip to content

Commit 23b713d

Browse files
committed
add RunTask.workflow support
Signed-off-by: Dmitrii Tikhomirov <[email protected]>
1 parent dbcfd33 commit 23b713d

File tree

9 files changed

+331
-0
lines changed

9 files changed

+331
-0
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.serverlessworkflow.impl.executors.ForkExecutor.ForkExecutorBuilder;
2828
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
2929
import io.serverlessworkflow.impl.executors.RaiseExecutor.RaiseExecutorBuilder;
30+
import io.serverlessworkflow.impl.executors.RunTaskExecutor.RunTaskExecutorBuilder;
3031
import io.serverlessworkflow.impl.executors.SetExecutor.SetExecutorBuilder;
3132
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
3233
import io.serverlessworkflow.impl.executors.TryExecutor.TryExecutorBuilder;
@@ -76,6 +77,8 @@ public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
7677
return new ListenExecutorBuilder(position, task.getListenTask(), definition);
7778
} else if (task.getEmitTask() != null) {
7879
return new EmitExecutorBuilder(position, task.getEmitTask(), definition);
80+
} else if (task.getRunTask() != null) {
81+
return new RunTaskExecutorBuilder(position, task.getRunTask(), definition);
7982
}
8083
throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet");
8184
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors;
17+
18+
import io.serverlessworkflow.api.types.RunTask;
19+
import io.serverlessworkflow.api.types.RunWorkflow;
20+
import io.serverlessworkflow.api.types.SubflowConfiguration;
21+
import io.serverlessworkflow.api.types.TaskBase;
22+
import io.serverlessworkflow.impl.TaskContext;
23+
import io.serverlessworkflow.impl.WorkflowApplication;
24+
import io.serverlessworkflow.impl.WorkflowContext;
25+
import io.serverlessworkflow.impl.WorkflowDefinition;
26+
import io.serverlessworkflow.impl.WorkflowDefinitionId;
27+
import io.serverlessworkflow.impl.WorkflowModel;
28+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
29+
import java.util.Map;
30+
import java.util.concurrent.CompletableFuture;
31+
32+
public abstract class RunTaskExecutor<T extends TaskBase> extends RegularTaskExecutor<T> {
33+
34+
protected RunTaskExecutor(RegularTaskExecutorBuilder<T> builder) {
35+
super(builder);
36+
}
37+
38+
public static class RunTaskExecutorBuilder
39+
extends RegularTaskExecutor.RegularTaskExecutorBuilder<RunTask> {
40+
41+
protected RunTaskExecutorBuilder(
42+
WorkflowMutablePosition position, RunTask task, WorkflowDefinition definition) {
43+
super(position, task, definition);
44+
}
45+
46+
@Override
47+
protected RegularTaskExecutor<RunTask> buildInstance() {
48+
if (task.getRun().getRunWorkflow() != null) {
49+
return new RunWorkflowExecutor(this);
50+
} else {
51+
throw new RuntimeException("Unsupported run task type");
52+
}
53+
}
54+
}
55+
56+
public static class RunWorkflowExecutor extends RunTaskExecutor<RunTask> {
57+
58+
private final WorkflowApplication application;
59+
60+
protected RunWorkflowExecutor(RunTaskExecutorBuilder builder) {
61+
super(builder);
62+
this.application = builder.application;
63+
}
64+
65+
@Override
66+
protected CompletableFuture<WorkflowModel> internalExecute(
67+
WorkflowContext workflowContext, TaskContext taskContext) {
68+
if (taskContext.task() != null && taskContext.task() instanceof RunTask runTask) {
69+
RunWorkflow runWorkflow = runTask.getRun().getRunWorkflow();
70+
SubflowConfiguration subflowConfiguration = runWorkflow.getWorkflow();
71+
String name = subflowConfiguration.getName();
72+
String version = subflowConfiguration.getVersion();
73+
String namespace = subflowConfiguration.getNamespace();
74+
for (Map.Entry<WorkflowDefinitionId, WorkflowDefinition> kv :
75+
application.workflowDefinitions().entrySet()) {
76+
WorkflowDefinitionId workflowDefinitionId = kv.getKey();
77+
if (workflowDefinitionId.name().equals(name)
78+
&& workflowDefinitionId.namespace().equals(namespace)
79+
&& workflowDefinitionId.version().equals(version)) {
80+
WorkflowDefinition workflowDefinition = kv.getValue();
81+
WorkflowApplication workflowApplication = workflowDefinition.application();
82+
return workflowApplication
83+
.workflowDefinition(workflowDefinition.workflow())
84+
.instance(taskContext.input().asJavaObject())
85+
.start();
86+
}
87+
}
88+
throw new IllegalArgumentException(
89+
"Subflow workflow not found: name=%s version=%s namespace=%s"
90+
.formatted(name, version, namespace));
91+
}
92+
throw new RuntimeException("Task must be of type RunTask");
93+
}
94+
}
95+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.test;
17+
18+
import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath;
19+
import static org.junit.Assert.assertEquals;
20+
21+
import io.serverlessworkflow.api.types.Workflow;
22+
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import java.io.IOException;
24+
import java.util.Map;
25+
import org.junit.Test;
26+
27+
public class SubWorkflowTest {
28+
29+
private static final String WORKFLOW_TEST_PATH = "workflows-samples/sub-workflow/";
30+
31+
@Test
32+
public void setTest() throws IOException {
33+
Workflow workflowParent =
34+
readWorkflowFromClasspath(WORKFLOW_TEST_PATH + "sub-workflow-parent.yaml");
35+
Workflow workflowChild =
36+
readWorkflowFromClasspath(WORKFLOW_TEST_PATH + "sub-workflow-child.yaml");
37+
38+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
39+
app.workflowDefinition(workflowChild);
40+
Map<String, Object> result =
41+
app.workflowDefinition(workflowParent)
42+
.instance(Map.of())
43+
.start()
44+
.get()
45+
.asMap()
46+
.orElseThrow();
47+
assertEquals("1", result.get("counter").toString());
48+
assertEquals("helloWorld", result.get("greeting").toString());
49+
} catch (Exception e) {
50+
throw new RuntimeException("Workflow execution failed", e);
51+
}
52+
}
53+
54+
@Test
55+
public void setBlankInputTest() throws IOException {
56+
Workflow workflowParent =
57+
readWorkflowFromClasspath(WORKFLOW_TEST_PATH + "sub-workflow-parent.yaml");
58+
Workflow workflowChild =
59+
readWorkflowFromClasspath(WORKFLOW_TEST_PATH + "sub-workflow-child.yaml");
60+
61+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
62+
app.workflowDefinition(workflowChild);
63+
Map<String, Object> result =
64+
app.workflowDefinition(workflowParent)
65+
.instance(Map.of())
66+
.start()
67+
.get()
68+
.asMap()
69+
.orElseThrow();
70+
assertEquals("1", result.get("counter").toString());
71+
assertEquals("helloWorld", result.get("greeting").toString());
72+
} catch (Exception e) {
73+
throw new RuntimeException("Workflow execution failed", e);
74+
}
75+
}
76+
77+
@Test
78+
public void setStringInputTest() throws IOException {
79+
Workflow workflowParent =
80+
readWorkflowFromClasspath(WORKFLOW_TEST_PATH + "sub-workflow-parent.yaml");
81+
Workflow workflowChild =
82+
readWorkflowFromClasspath(WORKFLOW_TEST_PATH + "sub-workflow-child.yaml");
83+
84+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
85+
app.workflowDefinition(workflowChild);
86+
Map<String, Object> result =
87+
app.workflowDefinition(workflowParent)
88+
.instance("Tested")
89+
.start()
90+
.get()
91+
.asMap()
92+
.orElseThrow();
93+
assertEquals("1", result.get("counter").toString());
94+
assertEquals("helloWorld", result.get("greeting").toString());
95+
} catch (Exception e) {
96+
throw new RuntimeException("Workflow execution failed", e);
97+
}
98+
}
99+
100+
@Test
101+
public void readContextAndSetTest() throws IOException {
102+
Workflow workflowParent =
103+
readWorkflowFromClasspath(
104+
WORKFLOW_TEST_PATH + "read-context-and-set-sub-workflow-parent.yaml");
105+
Workflow workflowChild =
106+
readWorkflowFromClasspath(
107+
WORKFLOW_TEST_PATH + "read-context-and-set-sub-workflow-child.yaml");
108+
Map<String, Object> map = Map.of("userId", "userId_1", "username", "test", "password", "test");
109+
110+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
111+
app.workflowDefinition(workflowChild);
112+
Map<String, Object> result =
113+
app.workflowDefinition(workflowParent).instance(map).start().get().asMap().orElseThrow();
114+
Map<String, String> updated = (Map<String, String>) result.get("updated");
115+
assertEquals("userId_1_tested", updated.get("userId"));
116+
assertEquals("test_tested", updated.get("username"));
117+
assertEquals("test_tested", updated.get("password"));
118+
assertEquals(
119+
"The workflow set-into-context:1.0.0 updated user in context", result.get("detail"));
120+
} catch (Exception e) {
121+
throw new RuntimeException("Workflow execution failed", e);
122+
}
123+
}
124+
125+
@Test
126+
public void outputExportContextAndSetTest() throws IOException {
127+
Workflow workflowParent =
128+
readWorkflowFromClasspath(
129+
WORKFLOW_TEST_PATH + "output-export-and-set-sub-workflow-parent.yaml");
130+
Workflow workflowChild =
131+
readWorkflowFromClasspath(
132+
WORKFLOW_TEST_PATH + "output-export-and-set-sub-workflow-child.yaml");
133+
Map<String, Object> map = Map.of("userId", "userId_1", "username", "test", "password", "test");
134+
135+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
136+
app.workflowDefinition(workflowChild);
137+
Map<String, Object> result =
138+
app.workflowDefinition(workflowParent).instance(map).start().get().asMap().orElseThrow();
139+
assertEquals("userId_1_tested", result.get("userId"));
140+
assertEquals("test_tested", result.get("username"));
141+
assertEquals("test_tested", result.get("password"));
142+
} catch (Exception e) {
143+
throw new RuntimeException("Workflow execution failed", e);
144+
}
145+
}
146+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
document:
2+
dsl: '1.0.0'
3+
namespace: default
4+
name: set-into-context
5+
version: '1.0.0'
6+
7+
do:
8+
- updateUser:
9+
set:
10+
updated:
11+
userId: '${ .userId + "_tested" }'
12+
username: '${ .username + "_tested" }'
13+
password: '${ .password + "_tested" }'
14+
output:
15+
as: .updated
16+
export:
17+
as: '.'
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
document:
2+
dsl: '1.0.0'
3+
namespace: default
4+
name: parent
5+
version: '1.0.0'
6+
7+
do:
8+
- sayHello:
9+
run:
10+
workflow:
11+
namespace: default
12+
name: set-into-context
13+
version: '1.0.0'
14+
input:
15+
foo: bar
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
document:
2+
dsl: '1.0.0'
3+
namespace: default
4+
name: set-into-context
5+
version: '1.0.0'
6+
7+
do:
8+
- updateUser:
9+
set:
10+
updated:
11+
userId: '${ .userId + "_tested" }'
12+
username: '${ .username + "_tested" }'
13+
password: '${ .password + "_tested" }'
14+
detail: '${ "The workflow " + $workflow.definition.document.name + ":" + $workflow.definition.document.version + " updated user in context" }'
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
document:
2+
dsl: '1.0.0'
3+
namespace: default
4+
name: parent
5+
version: '1.0.0'
6+
7+
do:
8+
- sayHello:
9+
run:
10+
workflow:
11+
namespace: default
12+
name: set-into-context
13+
version: '1.0.0'
14+
input:
15+
foo: bar
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
document:
2+
dsl: '1.0.0'
3+
namespace: default
4+
name: set-into-context
5+
version: '1.0.0'
6+
7+
do:
8+
- updateUser:
9+
set:
10+
greeting: "helloWorld"
11+
counter: 1
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
document:
2+
dsl: '1.0.0'
3+
namespace: default
4+
name: parent
5+
version: '1.0.0'
6+
7+
do:
8+
- sayHello:
9+
run:
10+
workflow:
11+
namespace: default
12+
name: set-into-context
13+
version: '1.0.0'
14+
input:
15+
foo: bar

0 commit comments

Comments
 (0)