Skip to content

Commit 125907c

Browse files
committed
[FLINK-26570][statefun] Remote module configuration interpolation
1 parent 38f5518 commit 125907c

File tree

11 files changed

+304
-35
lines changed

11 files changed

+304
-35
lines changed

docs/content/docs/modules/overview.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,35 @@ spec:
6161
6262
A module YAML file can contain multiple YAML documents, separated by `---`, each representing a component to be included in the application.
6363
Each component is defined by a kind typename string and a spec object containing the component's properties.
64+
65+
# Configuration string interpolation
66+
You can use `${placeholders}` inside `spec` elements. These will be replaced by entries from a configuration map, consisting of:
67+
1. System properties
68+
2. Environment properties
69+
3. Command line arguments
70+
71+
where (3) override (2) which override (1).
72+
73+
Example:
74+
```yaml
75+
kind: io.statefun.endpoints.v2/http
76+
spec:
77+
functions: com.example/*
78+
urlPathTemplate: ${FUNC_PROTOCOL}://${FUNC_DNS}/{function.name}
79+
---
80+
kind: io.statefun.kafka.v1/ingress
81+
spec:
82+
id: com.example/my-ingress
83+
address: ${KAFKA_ADDRESS}:${KAFKA_PORT}
84+
consumerGroupId: my-consumer-group
85+
topics:
86+
- topic: ${KAFKA_INGRESS_TOPIC}
87+
(...)
88+
properties:
89+
- ssl.truststore.location: ${SSL_TRUSTSTORE_LOCATION}
90+
- ssl.truststore.password: ${SSL_TRUSTSTORE_PASSWORD}
91+
(...)
92+
```
93+
{{< hint info >}}
94+
Please note that `{function.name}` is not a placeholder to be replaced by entries from the merged configuration. See [url template]({{< ref "docs/modules/http-endpoint/#url-template" >}})
95+
{{< /hint >}}

statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/java/org/apache/flink/statefun/e2e/smoke/golang/SmokeVerificationGolangE2E.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public void runWith() throws Throwable {
4848
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
4949
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
5050
.withBuildContextFileFromClasspath("ssl/", "ssl/")
51+
.withModuleGlobalConfiguration("MAX_NUM_BATCH_REQUESTS", "10000")
5152
.dependsOn(remoteFunction);
5253

5354
SmokeRunner.run(parameters, builder);

statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ kind: io.statefun.endpoints.v2/http
1717
spec:
1818
functions: statefun.smoke.e2e/command-interpreter-fn
1919
urlPathTemplate: https://remote-function-host
20-
maxNumBatchRequests: 10000
20+
maxNumBatchRequests: ${MAX_NUM_BATCH_REQUESTS}

statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ public void runWith() throws Throwable {
4848
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
4949
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
5050
.withBuildContextFileFromClasspath("certs", "/certs/")
51+
.withModuleGlobalConfiguration("TEST_COMMAND_INTERPRETER_FN", "command-interpreter-fn")
52+
.withModuleGlobalConfiguration("TEST_SERVER_PROTOCOL", "https://")
53+
.withModuleGlobalConfiguration("TEST_REMOTE_FUNCTION_HOST", "remote-function-host")
54+
.withModuleGlobalConfiguration("TEST_NUM_BATCH_REQUESTS", "10000")
55+
.withModuleGlobalConfiguration("TEST_REMOTE_FUNCTION_PORT", "8000")
5156
.dependsOn(remoteFunction);
5257

5358
SmokeRunner.run(parameters, builder);

statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515

1616
kind: io.statefun.endpoints.v2/http
1717
spec:
18-
functions: statefun.smoke.e2e/command-interpreter-fn
19-
urlPathTemplate: https://remote-function-host:8000
20-
maxNumBatchRequests: 10000
18+
functions: statefun.smoke.e2e/${TEST_COMMAND_INTERPRETER_FN}
19+
urlPathTemplate: ${TEST_SERVER_PROTOCOL}${TEST_REMOTE_FUNCTION_HOST}:${TEST_REMOTE_FUNCTION_PORT}
20+
maxNumBatchRequests: ${TEST_NUM_BATCH_REQUESTS}
2121
transport:
2222
type: io.statefun.transports.v1/async
2323
trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem

statefun-e2e-tests/statefun-smoke-e2e-js/src/test/java/org/apache/flink/statefun/e2e/smoke/js/SmokeVerificationJsE2E.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public void runWith() throws Throwable {
4747
StatefulFunctionsAppContainers.Builder builder =
4848
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
4949
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
50+
.withModuleGlobalConfiguration("REMOTE_FUNCTION_HOST", "remote-function-host")
5051
.dependsOn(remoteFunction);
5152

5253
SmokeRunner.run(parameters, builder);

statefun-e2e-tests/statefun-smoke-e2e-js/src/test/resources/remote-module/module.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@
1616
kind: io.statefun.endpoints.v2/http
1717
spec:
1818
functions: statefun.smoke.e2e/command-interpreter-fn
19-
urlPathTemplate: http://remote-function-host:8000
19+
urlPathTemplate: http://${REMOTE_FUNCTION_HOST}:8000
2020
maxNumBatchRequests: 10000

statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteModule.java

Lines changed: 116 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,30 @@
1818

1919
package org.apache.flink.statefun.flink.core.jsonmodule;
2020

21-
import static org.apache.flink.statefun.flink.core.spi.ExtensionResolverAccessor.getExtensionResolver;
22-
23-
import java.util.List;
24-
import java.util.Map;
25-
import java.util.Objects;
26-
import java.util.stream.Collectors;
27-
import java.util.stream.StreamSupport;
21+
import org.apache.commons.lang3.NotImplementedException;
22+
import org.apache.flink.api.java.utils.ParameterTool;
2823
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
24+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
2925
import org.apache.flink.statefun.extensions.ComponentBinder;
3026
import org.apache.flink.statefun.extensions.ComponentJsonObject;
3127
import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
3228
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
3331

34-
public final class RemoteModule implements StatefulFunctionModule {
32+
import java.util.*;
33+
import java.util.function.Function;
34+
import java.util.regex.Matcher;
35+
import java.util.regex.Pattern;
36+
import java.util.stream.Collectors;
37+
import java.util.stream.Stream;
38+
import java.util.stream.StreamSupport;
3539

40+
import static org.apache.flink.statefun.flink.core.spi.ExtensionResolverAccessor.getExtensionResolver;
41+
42+
public final class RemoteModule implements StatefulFunctionModule {
43+
private static final Logger LOG = LoggerFactory.getLogger(RemoteModule.class);
44+
private static final Pattern PLACEHOLDER_REGEX = Pattern.compile("\\$\\{(.*?)\\}");
3645
private final List<JsonNode> componentNodes;
3746

3847
RemoteModule(List<JsonNode> componentNodes) {
@@ -41,8 +50,16 @@ public final class RemoteModule implements StatefulFunctionModule {
4150

4251
@Override
4352
public void configure(Map<String, String> globalConfiguration, Binder moduleBinder) {
53+
Map<String, String> systemPropsThenEnvVarsThenGlobalConfig =
54+
ParameterTool.fromSystemProperties()
55+
.mergeWith(
56+
ParameterTool.fromMap(System.getenv())
57+
.mergeWith(ParameterTool.fromMap(globalConfiguration)))
58+
.toMap();
4459
parseComponentNodes(componentNodes)
45-
.forEach(component -> bindComponent(component, moduleBinder));
60+
.forEach(
61+
component ->
62+
bindComponent(component, moduleBinder, systemPropsThenEnvVarsThenGlobalConfig));
4663
}
4764

4865
private static List<ComponentJsonObject> parseComponentNodes(
@@ -53,10 +70,97 @@ private static List<ComponentJsonObject> parseComponentNodes(
5370
.collect(Collectors.toList());
5471
}
5572

56-
private static void bindComponent(ComponentJsonObject component, Binder moduleBinder) {
73+
private static void bindComponent(
74+
ComponentJsonObject component, Binder moduleBinder, Map<String, String> configuration) {
75+
76+
JsonNode resolvedSpec = valueResolutionFunction(configuration).apply(component.specJsonNode());
77+
ComponentJsonObject resolvedComponent =
78+
new ComponentJsonObject(component.get(), resolvedSpec);
79+
5780
final ExtensionResolver extensionResolver = getExtensionResolver(moduleBinder);
5881
final ComponentBinder componentBinder =
59-
extensionResolver.resolveExtension(component.binderTypename(), ComponentBinder.class);
60-
componentBinder.bind(component, moduleBinder);
82+
extensionResolver.resolveExtension(resolvedComponent.binderTypename(), ComponentBinder.class);
83+
componentBinder.bind(resolvedComponent, moduleBinder);
84+
}
85+
86+
private static Function<JsonNode, JsonNode> valueResolutionFunction(Map<String, String> config) {
87+
return value -> {
88+
if (value.isObject()) {
89+
return resolveObject((ObjectNode) value, config);
90+
} else if (value.isArray()) {
91+
return resolveArray((ArrayNode) value, config);
92+
} else if (value.isValueNode()) {
93+
return resolveValueNode((ValueNode) value, config);
94+
}
95+
96+
LOG.warn(
97+
"Unrecognised type (not in: object, array, value). Skipping ${placeholder} resolution for that node.");
98+
return value;
99+
};
100+
}
101+
102+
private static Function<Map.Entry<String, JsonNode>, AbstractMap.SimpleEntry<String, JsonNode>>
103+
keyValueResolutionFunction(Map<String, String> config) {
104+
return fieldNameValuePair ->
105+
new AbstractMap.SimpleEntry<>(
106+
fieldNameValuePair.getKey(),
107+
valueResolutionFunction(config).apply(fieldNameValuePair.getValue()));
108+
}
109+
110+
private static ValueNode resolveValueNode(ValueNode node, Map<String, String> config) {
111+
StringBuffer stringBuffer = new StringBuffer();
112+
Matcher placeholderMatcher = PLACEHOLDER_REGEX.matcher(node.asText());
113+
boolean placeholderReplaced = false;
114+
115+
while (placeholderMatcher.find()) {
116+
if (config.containsKey(placeholderMatcher.group(1))) {
117+
placeholderMatcher.appendReplacement(stringBuffer, config.get(placeholderMatcher.group(1)));
118+
placeholderReplaced = true;
119+
}
120+
}
121+
122+
if (placeholderReplaced) {
123+
placeholderMatcher.appendTail(stringBuffer);
124+
return new TextNode(stringBuffer.toString());
125+
}
126+
127+
return node;
128+
}
129+
130+
private static ObjectNode resolveObject(ObjectNode node, Map<String, String> config) {
131+
return getFieldStream(node)
132+
.map(keyValueResolutionFunction(config))
133+
.reduce(
134+
new ObjectNode(JsonNodeFactory.instance),
135+
(accumulatedObjectNode, resolvedFieldNameValueTuple) -> {
136+
accumulatedObjectNode.put(
137+
resolvedFieldNameValueTuple.getKey(), resolvedFieldNameValueTuple.getValue());
138+
return accumulatedObjectNode;
139+
},
140+
(objectNode1, objectNode2) -> {
141+
throw new NotImplementedException("This reduce is not used with parallel streams");
142+
});
143+
}
144+
145+
private static ArrayNode resolveArray(ArrayNode node, Map<String, String> config) {
146+
return getElementStream(node)
147+
.map(valueResolutionFunction(config))
148+
.reduce(
149+
new ArrayNode(JsonNodeFactory.instance),
150+
(accumulatedArrayNode, resolvedValue) -> {
151+
accumulatedArrayNode.add(resolvedValue);
152+
return accumulatedArrayNode;
153+
},
154+
(arrayNode1, arrayNode2) -> {
155+
throw new NotImplementedException("This reduce is not used with parallel streams");
156+
});
157+
}
158+
159+
private static Stream<Map.Entry<String, JsonNode>> getFieldStream(ObjectNode node) {
160+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(node.fields(), 0), false);
161+
}
162+
163+
private static Stream<JsonNode> getElementStream(ArrayNode node) {
164+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(node.elements(), 0), false);
61165
}
62166
}

0 commit comments

Comments
 (0)