2020
2121import static org .apache .flink .statefun .flink .core .spi .ExtensionResolverAccessor .getExtensionResolver ;
2222
23- import java .util .List ;
24- import java .util .Map ;
25- import java .util .Objects ;
23+ import java .util .*;
24+ import java .util .function .Function ;
25+ import java .util .regex .Matcher ;
26+ import java .util .regex .Pattern ;
2627import java .util .stream .Collectors ;
28+ import java .util .stream .Stream ;
2729import java .util .stream .StreamSupport ;
30+ import org .apache .commons .lang3 .NotImplementedException ;
31+ import org .apache .flink .api .java .utils .ParameterTool ;
2832import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .JsonNode ;
33+ import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .node .*;
2934import org .apache .flink .statefun .extensions .ComponentBinder ;
3035import org .apache .flink .statefun .extensions .ComponentJsonObject ;
3136import org .apache .flink .statefun .flink .core .spi .ExtensionResolver ;
3237import org .apache .flink .statefun .sdk .spi .StatefulFunctionModule ;
38+ import org .slf4j .Logger ;
39+ import org .slf4j .LoggerFactory ;
3340
3441public final class RemoteModule implements StatefulFunctionModule {
35-
42+ private static final Logger LOG = LoggerFactory .getLogger (RemoteModule .class );
43+ private static final Pattern PLACEHOLDER_REGEX = Pattern .compile ("\\ $\\ {(.*?)\\ }" );
3644 private final List <JsonNode > componentNodes ;
3745
3846 RemoteModule (List <JsonNode > componentNodes ) {
@@ -41,8 +49,16 @@ public final class RemoteModule implements StatefulFunctionModule {
4149
4250 @ Override
4351 public void configure (Map <String , String > globalConfiguration , Binder moduleBinder ) {
52+ Map <String , String > systemPropsThenEnvVarsThenGlobalConfig =
53+ ParameterTool .fromSystemProperties ()
54+ .mergeWith (
55+ ParameterTool .fromMap (System .getenv ())
56+ .mergeWith (ParameterTool .fromMap (globalConfiguration )))
57+ .toMap ();
4458 parseComponentNodes (componentNodes )
45- .forEach (component -> bindComponent (component , moduleBinder ));
59+ .forEach (
60+ component ->
61+ bindComponent (component , moduleBinder , systemPropsThenEnvVarsThenGlobalConfig ));
4662 }
4763
4864 private static List <ComponentJsonObject > parseComponentNodes (
@@ -53,10 +69,102 @@ private static List<ComponentJsonObject> parseComponentNodes(
5369 .collect (Collectors .toList ());
5470 }
5571
56- private static void bindComponent (ComponentJsonObject component , Binder moduleBinder ) {
72+ private static void bindComponent (
73+ ComponentJsonObject component , Binder moduleBinder , Map <String , String > configuration ) {
74+
75+ JsonNode resolvedSpec = valueResolutionFunction (configuration ).apply (component .specJsonNode ());
76+ ComponentJsonObject resolvedComponent = new ComponentJsonObject (component .get (), resolvedSpec );
77+
5778 final ExtensionResolver extensionResolver = getExtensionResolver (moduleBinder );
5879 final ComponentBinder componentBinder =
59- extensionResolver .resolveExtension (component .binderTypename (), ComponentBinder .class );
60- componentBinder .bind (component , moduleBinder );
80+ extensionResolver .resolveExtension (
81+ resolvedComponent .binderTypename (), ComponentBinder .class );
82+ componentBinder .bind (resolvedComponent , moduleBinder );
83+ }
84+
85+ private static Function <JsonNode , JsonNode > valueResolutionFunction (Map <String , String > config ) {
86+ return value -> {
87+ if (value .isObject ()) {
88+ return resolveObject ((ObjectNode ) value , config );
89+ } else if (value .isArray ()) {
90+ return resolveArray ((ArrayNode ) value , config );
91+ } else if (value .isValueNode ()) {
92+ return resolveValueNode ((ValueNode ) value , config );
93+ }
94+
95+ LOG .warn (
96+ "Unrecognised type (not in: object, array, value). Skipping ${placeholder} resolution for that node." );
97+ return value ;
98+ };
99+ }
100+
101+ private static Function <Map .Entry <String , JsonNode >, AbstractMap .SimpleEntry <String , JsonNode >>
102+ keyValueResolutionFunction (Map <String , String > config ) {
103+ return fieldNameValuePair ->
104+ new AbstractMap .SimpleEntry <>(
105+ fieldNameValuePair .getKey (),
106+ valueResolutionFunction (config ).apply (fieldNameValuePair .getValue ()));
107+ }
108+
109+ private static ValueNode resolveValueNode (ValueNode node , Map <String , String > config ) {
110+ StringBuffer stringBuffer = new StringBuffer ();
111+ Matcher placeholderMatcher = PLACEHOLDER_REGEX .matcher (node .asText ());
112+ boolean placeholderReplaced = false ;
113+
114+ while (placeholderMatcher .find ()) {
115+ if (config .containsKey (placeholderMatcher .group (1 ))) {
116+ placeholderMatcher .appendReplacement (stringBuffer , config .get (placeholderMatcher .group (1 )));
117+ placeholderReplaced = true ;
118+ } else {
119+ throw new IllegalArgumentException (
120+ String .format (
121+ "Could not resolve placeholder '%s'. An entry for this key was not found in the configuration." ,
122+ node .asText ()));
123+ }
124+ }
125+
126+ if (placeholderReplaced ) {
127+ placeholderMatcher .appendTail (stringBuffer );
128+ return new TextNode (stringBuffer .toString ());
129+ }
130+
131+ return node ;
132+ }
133+
134+ private static ObjectNode resolveObject (ObjectNode node , Map <String , String > config ) {
135+ return getFieldStream (node )
136+ .map (keyValueResolutionFunction (config ))
137+ .reduce (
138+ new ObjectNode (JsonNodeFactory .instance ),
139+ (accumulatedObjectNode , resolvedFieldNameValueTuple ) -> {
140+ accumulatedObjectNode .put (
141+ resolvedFieldNameValueTuple .getKey (), resolvedFieldNameValueTuple .getValue ());
142+ return accumulatedObjectNode ;
143+ },
144+ (objectNode1 , objectNode2 ) -> {
145+ throw new NotImplementedException ("This reduce is not used with parallel streams" );
146+ });
147+ }
148+
149+ private static ArrayNode resolveArray (ArrayNode node , Map <String , String > config ) {
150+ return getElementStream (node )
151+ .map (valueResolutionFunction (config ))
152+ .reduce (
153+ new ArrayNode (JsonNodeFactory .instance ),
154+ (accumulatedArrayNode , resolvedValue ) -> {
155+ accumulatedArrayNode .add (resolvedValue );
156+ return accumulatedArrayNode ;
157+ },
158+ (arrayNode1 , arrayNode2 ) -> {
159+ throw new NotImplementedException ("This reduce is not used with parallel streams" );
160+ });
161+ }
162+
163+ private static Stream <Map .Entry <String , JsonNode >> getFieldStream (ObjectNode node ) {
164+ return StreamSupport .stream (Spliterators .spliteratorUnknownSize (node .fields (), 0 ), false );
165+ }
166+
167+ private static Stream <JsonNode > getElementStream (ArrayNode node ) {
168+ return StreamSupport .stream (Spliterators .spliteratorUnknownSize (node .elements (), 0 ), false );
61169 }
62170}
0 commit comments