From 8b018f61d95bbd57919b868950bcceba58dae512 Mon Sep 17 00:00:00 2001 From: Yufei Zhang Date: Fri, 25 Nov 2022 19:30:53 +0800 Subject: [PATCH] 18532: support reading configs from file --- .../functions/instance/InstanceConfig.java | 2 + .../runtime/JavaInstanceConfiguration.java | 219 ++++++++ .../runtime/JavaInstanceStarter.java | 416 +++++++++++--- .../runtime/JavaInstanceStarterTest.java | 515 ++++++++++++++++++ 4 files changed, 1089 insertions(+), 63 deletions(-) create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceConfiguration.java create mode 100644 pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/JavaInstanceStarterTest.java diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java index 1a89505d9bb11..7fac60333db85 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java @@ -49,6 +49,8 @@ public class InstanceConfig { private int metricsPort; private List additionalJavaRuntimeArguments = Collections.emptyList(); + private String configFile; + /** * Get the string representation of {@link #getInstanceId()}. * diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceConfiguration.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceConfiguration.java new file mode 100644 index 0000000000000..b4e162945e1d2 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceConfiguration.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime; + +import java.util.Properties; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.apache.pulsar.common.configuration.Category; +import org.apache.pulsar.common.configuration.FieldContext; +import org.apache.pulsar.common.configuration.PulsarConfiguration; + + +@Getter +@Setter +@ToString +public class JavaInstanceConfiguration implements PulsarConfiguration { + @Category + private static final String CATEGORY_FUNCTIONS = "Functions"; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Function details json" + ) + private String functionDetailsJsonString; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Path to user function jar" + ) + private String jarFile; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Path to the transform function jar" + ) + private String transformFunctionJarFile; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "instanceId used to uniquely identify a function instance" + ) + private Integer instanceId; + + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "functionId used to uniquely identify a function" + ) + private String functionId; + + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "The version of the function" + ) + private String functionVersion; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "serviceUrl of the target Pulsar cluster" + ) + private String pulsarServiceUrl; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "functionId of the transform function" + ) + private String transformFunctionId; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Client auth plugin full classname" + ) + private String clientAuthenticationPlugin; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Client auth parameters" + ) + private String clientAuthenticationParameters; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Use tls connection" + ) + private Boolean useTls; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Allow insecure tls connection" + ) + private Boolean tlsAllowInsecureConnection; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Enable hostname verification" + ) + private Boolean tlsHostNameVerificationEnabled; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "tls trust cert file path" + ) + private String tlsTrustCertFilePath; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "State storage service implementation classname" + ) + private String stateStorageImplClass; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "State storage service url" + ) + private String stateStorageServiceUrl; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Port to listen on" + ) + private Integer port; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Port metrics will be exposed on" + ) + private Integer metricsPort; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Maximum number of tuples to buffer" + ) + private Integer maxBufferedTuples; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Expected interval in seconds between health checks" + ) + private Integer expectedHealthCheckInterval; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "The classname of the secrets provider" + ) + private String secretsProviderClassName; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "The config that needs to be passed to secrets provider" + ) + private String secretsProviderConfig; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "The name of the cluster this instance is running on" + ) + private String clusterName; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "The directory where extraction of nar packages happen" + ) + private String narExtractionDirectory; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Max pending async requests per instance" + ) + private Integer maxPendingAsyncRequests; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Pulsar Web Service Url" + ) + private String webServiceUrl; + + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Whether the pulsar admin client exposed to function context, default is disabled" + ) + private Boolean exposePulsarAdminClientEnabled; + + + @ToString.Exclude + @com.fasterxml.jackson.annotation.JsonIgnore + private Properties properties = new Properties(); + + public Object getProperty(String key) { + return properties.get(key); + } + + @Override + public Properties getProperties() { + return properties; + } + + @Override + public void setProperties(Properties properties) { + this.properties = properties; + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java index deff690815d9c..a790a50f32660 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java @@ -18,11 +18,16 @@ */ package org.apache.pulsar.functions.runtime; + +import static java.util.Objects.requireNonNull; import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType; import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameterized; +import com.beust.jcommander.Strings; import com.beust.jcommander.converters.StringConverter; +import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import com.google.protobuf.Empty; @@ -31,14 +36,22 @@ import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import io.prometheus.client.exporter.HTTPServer; + +import java.io.FileNotFoundException; +import java.io.IOException; import java.lang.reflect.Type; import java.net.InetSocketAddress; +import java.util.HashSet; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.functions.WindowConfig; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.util.Reflections; @@ -54,102 +67,108 @@ import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.utils.FunctionCommon; +import javax.validation.ValidationException; + @Slf4j public class JavaInstanceStarter implements AutoCloseable { - @Parameter(names = "--function_details", description = "Function details json\n", required = true) - public String functionDetailsJsonString; + @Parameter(names = "--function_details", description = "Function details json\n") + public String functionDetailsJsonString = null; @Parameter( names = "--jar", - description = "Path to Jar\n", + description = "Path to user function jar\n", listConverter = StringConverter.class) - public String jarFile; + public String jarFile = null; @Parameter( names = "--transform_function_jar", - description = "Path to Transform Function Jar\n", + description = "Path to the transform function jar\n", listConverter = StringConverter.class) - public String transformFunctionJarFile; + public String transformFunctionJarFile = null; - @Parameter(names = "--instance_id", description = "Instance Id\n", required = true) - public int instanceId; + @Parameter(names = "--instance_id", description = "instanceId used to uniquely identify a function instance\n") + public Integer instanceId = null; - @Parameter(names = "--function_id", description = "Function Id\n", required = true) - public String functionId; + @Parameter(names = "--function_id", description = "functionId used to uniquely identify a function\n") + public String functionId = null; - @Parameter(names = "--function_version", description = "Function Version\n", required = true) - public String functionVersion; + @Parameter(names = "--function_version", description = "The version of the function\n") + public String functionVersion = null; - @Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true) - public String pulsarServiceUrl; + @Parameter(names = "--pulsar_serviceurl", description = "serviceUrl of the target Pulsar cluster\n") + public String pulsarServiceUrl = null; - @Parameter(names = "--transform_function_id", description = "Transform Function Id\n") - public String transformFunctionId; + @Parameter(names = "--transform_function_id", description = "functionId of the transform function\n") + public String transformFunctionId = null; - @Parameter(names = "--client_auth_plugin", description = "Client auth plugin name\n") - public String clientAuthenticationPlugin; + @Parameter(names = "--client_auth_plugin", description = "Client auth plugin full classname\n") + public String clientAuthenticationPlugin = null; - @Parameter(names = "--client_auth_params", description = "Client auth param\n") - public String clientAuthenticationParameters; + @Parameter(names = "--client_auth_params", description = "Client auth parameters\n") + public String clientAuthenticationParameters = null; - @Parameter(names = "--use_tls", description = "Use tls connection\n") - public String useTls = Boolean.FALSE.toString(); + @Parameter(names = "--use_tls", description = "Use tls connection\n", arity = 1) + public Boolean useTls = null; - @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n") - public String tlsAllowInsecureConnection = Boolean.TRUE.toString(); + @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n", arity = 1) + public Boolean tlsAllowInsecureConnection = null; - @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification") - public String tlsHostNameVerificationEnabled = Boolean.FALSE.toString(); + @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification", arity = 1) + public Boolean tlsHostNameVerificationEnabled = null; @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path") - public String tlsTrustCertFilePath; + public String tlsTrustCertFilePath = null; - @Parameter(names = "--state_storage_impl_class", description = "State Storage Service " - + "Implementation class\n", required = false) - public String stateStorageImplClass; + @Parameter(names = "--state_storage_impl_class", description = "State storage service" + + "implementation classname\n") + public String stateStorageImplClass = null; - @Parameter(names = "--state_storage_serviceurl", description = "State Storage Service Url\n", required = false) - public String stateStorageServiceUrl; + @Parameter(names = "--state_storage_serviceurl", description = "State storage service url\n") + public String stateStorageServiceUrl = null; - @Parameter(names = "--port", description = "Port to listen on\n", required = true) - public int port; + @Parameter(names = "--port", description = "Port to listen on\n") + public Integer port = null; - @Parameter(names = "--metrics_port", description = "Port metrics will be exposed on\n", required = true) - public int metricsPort; + @Parameter(names = "--metrics_port", description = "Port metrics will be exposed on\n") + public Integer metricsPort = null; - @Parameter(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n", required = true) - public int maxBufferedTuples; + @Parameter(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n") + public Integer maxBufferedTuples = null; @Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in " - + "seconds between healtchecks", required = true) - public int expectedHealthCheckInterval; + + "seconds between health checks") + public Integer expectedHealthCheckInterval = null; - @Parameter(names = "--secrets_provider", description = "The classname of the secrets provider", required = false) - public String secretsProviderClassName; + @Parameter(names = "--secrets_provider", description = "The classname of the secrets provider") + public String secretsProviderClassName = null; @Parameter(names = "--secrets_provider_config", description = "The config that needs to be " - + "passed to secrets provider", required = false) - public String secretsProviderConfig; + + "passed to secrets provider") + public String secretsProviderConfig = null; @Parameter(names = "--cluster_name", description = "The name of the cluster this " - + "instance is running on", required = true) - public String clusterName; + + "instance is running on") + public String clusterName = null; @Parameter(names = "--nar_extraction_directory", description = "The directory where " - + "extraction of nar packages happen", required = false) - public String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; + + "extraction of nar packages happen") + public String narExtractionDirectory = null; - @Parameter(names = "--pending_async_requests", description = "Max pending async requests per instance", - required = false) - public int maxPendingAsyncRequests = 1000; + @Parameter(names = "--pending_async_requests", description = "Max pending async requests per instance") + public Integer maxPendingAsyncRequests = null; - @Parameter(names = "--web_serviceurl", description = "Pulsar Web Service Url", required = false) + @Parameter(names = "--web_serviceurl", description = "Pulsar Web Service Url") public String webServiceUrl = null; @Parameter(names = "--expose_pulsaradmin", description = "Whether the pulsar admin client " - + "exposed to function context, default is disabled.", required = false) + + "exposed to function context, default is disabled. Providing this flag set value to true") + public Boolean exposePulsarAdminClientEnabled = false; + @Parameter(names = "--config_file", description = "The config file for instance to use, default " + + "use coomand line args") + public String configFile = null; + private Server server; private RuntimeSpawner runtimeSpawner; private ThreadRuntimeFactory containerFactory; @@ -164,9 +183,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL throws Exception { Thread.currentThread().setContextClassLoader(functionInstanceClassLoader); - JCommander jcommander = new JCommander(this); - // parse args by JCommander - jcommander.parse(args); + setConfigs(args); InstanceConfig instanceConfig = new InstanceConfig(); instanceConfig.setFunctionId(functionId); @@ -190,6 +207,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL instanceConfig.setFunctionDetails(functionDetails); instanceConfig.setPort(port); instanceConfig.setMetricsPort(metricsPort); + instanceConfig.setConfigFile(configFile); Map secretsProviderConfigMap = null; if (!StringUtils.isEmpty(secretsProviderConfig)) { @@ -225,9 +243,9 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL stateStorageImplClass, stateStorageServiceUrl, AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin) - .clientAuthenticationParameters(clientAuthenticationParameters).useTls(isTrue(useTls)) - .tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection)) - .tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled)) + .clientAuthenticationParameters(clientAuthenticationParameters).useTls(useTls) + .tlsAllowInsecureConnection(tlsAllowInsecureConnection) + .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader, exposePulsarAdminClientEnabled, webServiceUrl); @@ -281,10 +299,6 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL close(); } - private static boolean isTrue(String param) { - return Boolean.TRUE.toString().equals(param); - } - @Override public void close() { try { @@ -474,4 +488,280 @@ public void healthCheck(com.google.protobuf.Empty request, lastHealthCheckTs = System.currentTimeMillis(); } } + + @VisibleForTesting + protected void completeConfigFromFileIfCmdArgNotProvided() throws IOException { + if (configFile == null) { + // skip the file if not provided + return; + } + + JavaInstanceConfiguration instanceConfiguration; + try { + instanceConfiguration = PulsarConfigurationLoader.create( + configFile, JavaInstanceConfiguration.class); + + } catch (FileNotFoundException e) { + log.warn("The file {} is not found, using command line args only", configFile); + return; + } + + requireNonNull(instanceConfiguration); + // optional String configs + jarFile = useSecondIfFirstIsNull( + jarFile, + instanceConfiguration.getJarFile() + ); + transformFunctionJarFile = useSecondIfFirstIsNull( + transformFunctionJarFile, + instanceConfiguration.getTransformFunctionJarFile() + ); + transformFunctionId = useSecondIfFirstIsNull( + transformFunctionId, + instanceConfiguration.getTransformFunctionId() + ); + clientAuthenticationPlugin = useSecondIfFirstIsNull( + clientAuthenticationPlugin, + instanceConfiguration.getClientAuthenticationPlugin() + ); + clientAuthenticationParameters = useSecondIfFirstIsNull( + clientAuthenticationParameters, + instanceConfiguration.getClientAuthenticationParameters() + ); + tlsTrustCertFilePath = useSecondIfFirstIsNull( + tlsTrustCertFilePath, + instanceConfiguration.getTlsTrustCertFilePath() + ); + stateStorageImplClass = useSecondIfFirstIsNull( + stateStorageImplClass, + instanceConfiguration.getStateStorageImplClass() + ); + stateStorageServiceUrl = useSecondIfFirstIsNull( + stateStorageServiceUrl, + instanceConfiguration.getStateStorageServiceUrl() + ); + secretsProviderClassName = useSecondIfFirstIsNull( + secretsProviderClassName, + instanceConfiguration.getSecretsProviderClassName() + ); + secretsProviderConfig = useSecondIfFirstIsNull( + secretsProviderConfig, + instanceConfiguration.getSecretsProviderConfig() + ); + narExtractionDirectory = useSecondIfFirstIsNull( + narExtractionDirectory, + instanceConfiguration.getNarExtractionDirectory() + ); + webServiceUrl = useSecondIfFirstIsNull( + webServiceUrl, + instanceConfiguration.getWebServiceUrl() + ); + + // optional Integer configs + maxPendingAsyncRequests = useSecondIfFirstIsNull( + maxPendingAsyncRequests, + instanceConfiguration.getMaxPendingAsyncRequests() + ); + + // optional Boolean configs + useTls = useSecondIfFirstIsNull( + useTls, + instanceConfiguration.getUseTls() + ); + tlsAllowInsecureConnection = useSecondIfFirstIsNull( + tlsAllowInsecureConnection, + instanceConfiguration.getTlsAllowInsecureConnection() + ); + tlsHostNameVerificationEnabled = useSecondIfFirstIsNull( + tlsHostNameVerificationEnabled, + instanceConfiguration.getTlsHostNameVerificationEnabled() + ); + + // special arity=0 Boolean config + exposePulsarAdminClientEnabled = exposePulsarAdminClientEnabled + || useSecondIfFirstIsNull(instanceConfiguration.getExposePulsarAdminClientEnabled(), false); + + // required String configs + functionDetailsJsonString = useSecondIfFirstIsNull( + functionDetailsJsonString, + instanceConfiguration.getFunctionDetailsJsonString() + ); + functionId = useSecondIfFirstIsNull( + functionId, + instanceConfiguration.getFunctionId() + ); + functionVersion = useSecondIfFirstIsNull( + functionVersion, + instanceConfiguration.getFunctionVersion() + ); + pulsarServiceUrl = useSecondIfFirstIsNull( + pulsarServiceUrl, + instanceConfiguration.getPulsarServiceUrl() + ); + clusterName = useSecondIfFirstIsNull( + clusterName, + instanceConfiguration.getClusterName() + ); + + // required Integer configs + instanceId = useSecondIfFirstIsNull( + instanceId, + instanceConfiguration.getInstanceId() + ); + port = useSecondIfFirstIsNull( + port, + instanceConfiguration.getPort() + ); + metricsPort = useSecondIfFirstIsNull( + metricsPort, + instanceConfiguration.getMetricsPort() + ); + maxBufferedTuples = useSecondIfFirstIsNull( + maxBufferedTuples, + instanceConfiguration.getMaxBufferedTuples() + ); + expectedHealthCheckInterval = useSecondIfFirstIsNull( + expectedHealthCheckInterval, + instanceConfiguration.getExpectedHealthCheckInterval() + ); + } + + private void validateRequiredConfigs() { + try { + // String configs + requireNonNull(functionDetailsJsonString); + requireNonNull(functionId); + requireNonNull(functionVersion); + requireNonNull(pulsarServiceUrl); + requireNonNull(clusterName); + // Integer configs + requireNonNull(instanceId); + requireNonNull(port); + requireNonNull(metricsPort); + requireNonNull(maxBufferedTuples); + requireNonNull(expectedHealthCheckInterval); + } catch (NullPointerException e) { + throw new IllegalArgumentException(String.format("Required field are not provided in command line args" + + " or file, list of required fields are: %s", + Strings.join(",", requiredConfigFieldNames().toArray())), + e); + } + } + + + /** + * This method is used define default values if a config field + * is not provided in command line args nor file. + */ + private void useDefaultValueIfBothFileAndJCommanderNotProvided() { + + jarFile = useSecondIfFirstIsNull(jarFile, ""); + transformFunctionJarFile = useSecondIfFirstIsNull(transformFunctionJarFile, ""); + transformFunctionId = useSecondIfFirstIsNull(transformFunctionId, ""); + clientAuthenticationPlugin = useSecondIfFirstIsNull(clientAuthenticationPlugin, ""); + clientAuthenticationParameters = useSecondIfFirstIsNull(clientAuthenticationParameters, ""); + tlsTrustCertFilePath = useSecondIfFirstIsNull(tlsTrustCertFilePath, ""); + stateStorageImplClass = useSecondIfFirstIsNull(stateStorageImplClass, ""); + stateStorageServiceUrl = useSecondIfFirstIsNull(stateStorageServiceUrl, ""); + secretsProviderClassName = useSecondIfFirstIsNull(secretsProviderClassName, ""); + secretsProviderConfig = useSecondIfFirstIsNull(secretsProviderConfig, ""); + narExtractionDirectory = useSecondIfFirstIsNull(narExtractionDirectory, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR); + webServiceUrl = useSecondIfFirstIsNull(webServiceUrl, null); + + // Integer + maxPendingAsyncRequests = useSecondIfFirstIsNull(maxPendingAsyncRequests, 1000); + + // Boolean + useTls = useSecondIfFirstIsNull(useTls, Boolean.FALSE); + tlsAllowInsecureConnection = useSecondIfFirstIsNull(tlsAllowInsecureConnection, Boolean.TRUE); + tlsHostNameVerificationEnabled = useSecondIfFirstIsNull(tlsHostNameVerificationEnabled, Boolean.FALSE); + exposePulsarAdminClientEnabled = useSecondIfFirstIsNull(exposePulsarAdminClientEnabled, Boolean.FALSE); + } + + + + // TODO this method can be replaced with requireNonNullElse() after Java 9. + private T useSecondIfFirstIsNull(T valueToTest, T valueIfNull) { + if (Objects.isNull(valueToTest)) { + return valueIfNull; + } else { + return valueToTest; + } + } + + /** + * If a config field is not provided in JCommander, then it will be initialized to null. + * + *

Then we reads from file. If file is not valid or non-exist or empty (cannot load the file), just skip the file + * as if no file are provided. When reading configs from file, first check if the config field is null, only + * set the config field from file if the config field is null (which indicates command line didn't provide this + * config field). This way we can ensure the command line has a higher priority than the file config. + * + *

If a config field is not provided by command nor by file (also indicated by a null value), + * then we will throw an exception if the config field is required, or set it with the default value. + * @param args + * @throws IOException + */ + @VisibleForTesting + protected void setConfigs(String[] args) throws IOException{ + JCommander jcommander = new JCommander(this); + // parse args by JCommander + jcommander.parse(args); + + completeConfigFromFileIfCmdArgNotProvided(); + validateRequiredConfigs(); + useDefaultValueIfBothFileAndJCommanderNotProvided(); + } + + protected static Set requireConfigFieldsNames = new HashSet<>(){{ + // String + add("functionDetailsJsonString"); + add("functionId"); + add("functionVersion"); + add("pulsarServiceUrl"); + add("clusterName"); + add("instanceId"); + add("port"); + add("metricsPort"); + add("maxBufferedTuples"); + add("expectedHealthCheckInterval"); + }}; + + @VisibleForTesting + protected Set requiredConfigFieldNames() { + return requireConfigFieldsNames; + } + + @VisibleForTesting + protected Set optionalConfigFieldNames() { + JCommander jc = new JCommander(this); + return jc.getFields() + .keySet() + .stream() + .map(Parameterized::getName) + .filter(name -> !requireConfigFieldsNames.contains(name)) + .collect(Collectors.toSet()); + } + + @VisibleForTesting + protected Set requiredConfigLongestArgNames() { + JCommander jc = new JCommander(this); + return jc.getFields() + .entrySet() + .stream() + .filter(entry -> requireConfigFieldsNames.contains(entry.getKey().getName())) + .map(entry -> entry.getValue().getLongestName()) + .collect(Collectors.toSet()); + } + + @VisibleForTesting + protected Set optionalConfigLongestArgNames() { + JCommander jc = new JCommander(this); + return jc.getFields() + .entrySet() + .stream() + .filter(entry -> !requireConfigFieldsNames.contains(entry.getKey().getName())) + .map(entry -> entry.getValue().getLongestName()) + .collect(Collectors.toSet()); + } } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/JavaInstanceStarterTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/JavaInstanceStarterTest.java new file mode 100644 index 0000000000000..33474f323159a --- /dev/null +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/JavaInstanceStarterTest.java @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime; + +import com.beust.jcommander.JCommander; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Paths; +import java.util.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.assertj.core.util.Sets; +import org.bouncycastle.util.Integers; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.apache.commons.lang3.RandomStringUtils.randomNumeric; + +@Slf4j +public class JavaInstanceStarterTest { + + private static String STRING_INVALID_VALUE = "invalidString"; + + private static String INTEGER_INVALID_VALUE = "-1"; + + @Test + public void jCommanderShouldSetValueToNullIfConfigNotProvided() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .withJCommanderConfigs("--transform_function_id", STRING_INVALID_VALUE) + .build(); + JCommander jcommander = new JCommander(javaInstanceStarter); + // parse args by JCommander + jcommander.parse(args); + + Assert.assertEquals(javaInstanceStarter.transformFunctionId, STRING_INVALID_VALUE); + // verify if only use JCommander, the setting will set the String and Integer and Boolean type to null + // Required + Assert.assertNull(javaInstanceStarter.expectedHealthCheckInterval); + // Optional + Assert.assertNull(javaInstanceStarter.clientAuthenticationPlugin); + Assert.assertNull(javaInstanceStarter.useTls); + Assert.assertFalse(javaInstanceStarter.exposePulsarAdminClientEnabled); + } + + @Test + public void fileShouldSetValueToNullIfConfigNotProvided() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .withJCommanderConfigs(requiredJCommanderConfigs()) + .useFile() + .build(); + JCommander jcommander = new JCommander(javaInstanceStarter); + // parse args by JCommander + jcommander.parse(args); + javaInstanceStarter.completeConfigFromFileIfCmdArgNotProvided(); + // verify if only use JCommander, the setting will set the String and Integer and Boolean type to null + // Optional + Assert.assertNull(javaInstanceStarter.clientAuthenticationPlugin); + Assert.assertNull(javaInstanceStarter.useTls); + Assert.assertFalse(javaInstanceStarter.exposePulsarAdminClientEnabled); + } + + @Test + public void missingRequiredFieldShouldThrowIllegalArgumentException() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .withJCommanderConfigs("--transform_function_id", STRING_INVALID_VALUE) + .build(); + Assert.expectThrows(IllegalArgumentException.class, () -> javaInstanceStarter.setConfigs(args)); + } + + @Test + public void provideRequiredFieldsOnlyShouldSetValueProperlyAndOptionalFieldsAreSetToDefault() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .withJCommanderConfigs(requiredJCommanderConfigs()) + .build(); + javaInstanceStarter.setConfigs(args); + assertRequiredFieldsAreSet(javaInstanceStarter); + assertOptionalFieldHasDefaultValue(javaInstanceStarter); + } + @Test + public void useBooleanStringShouldSetArityOneBooleanField() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .withJCommanderConfigs(requiredJCommanderConfigs()) + .withJCommanderConfigs("--use_tls", Boolean.TRUE.toString()) + .build(); + javaInstanceStarter.setConfigs(args); + assertRequiredFieldsAreSet(javaInstanceStarter); + Assert.assertEquals(javaInstanceStarter.useTls, Boolean.TRUE); + } + + @Test + public void useFlagShouldSetArityZeroBooleanField() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .withJCommanderConfigs(requiredJCommanderConfigs()) + .arityZeroBooleanInJCommander() + .build(); + javaInstanceStarter.setConfigs(args); + assertRequiredFieldsAreSet(javaInstanceStarter); + Assert.assertEquals(javaInstanceStarter.exposePulsarAdminClientEnabled, Boolean.TRUE); + } + + @Test + public void invalidConfigFileShouldBeIgnored() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .withJCommanderConfigs(requiredJCommanderConfigs()) + .withJCommanderConfigs("--config_file", "nonexist.conf") + .build(); + javaInstanceStarter.setConfigs(args); + assertRequiredFieldsAreSet(javaInstanceStarter); + assertOptionalFieldHasDefaultValue(javaInstanceStarter); + } + + @Test + public void provideRequiredFieldsOnlyShouldSetValueProperlyAndOptionalFieldsAreSetToDefaultWithEmptyFile() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .withJCommanderConfigs(requiredJCommanderConfigs()) + .useFile() + .build(); + javaInstanceStarter.setConfigs(args); + assertRequiredFieldsAreSet(javaInstanceStarter); + assertOptionalFieldHasDefaultValue(javaInstanceStarter); + } + + @Test + public void provideRequiredFieldsFromFileShouldSuccess() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .withFileConfigs(requiredFileConfigs()) + .build(); + javaInstanceStarter.setConfigs(args); + assertRequiredFieldsAreSet(javaInstanceStarter); + assertOptionalFieldHasDefaultValue(javaInstanceStarter); + } + + @Test + public void provideFieldsFromFileAndCommandLineShouldSuccess() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .withJCommanderConfigs("--instance_id", INTEGER_INVALID_VALUE) + .withFileConfigs(requiredFileConfigs()) + .exceptFileConfigWithKey("instanceId") + .build(); + javaInstanceStarter.setConfigs(args); + assertRequiredFieldsAreSet(javaInstanceStarter); + assertOptionalFieldHasDefaultValue(javaInstanceStarter); + } + + @Test + public void provideAllConfigsUsingCommandLineShouldSucess() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + + String[] args = new CommandLineArgsBuilder() + .withJCommanderConfigs(requiredJCommanderConfigs()) + .withJCommanderConfigs(optionalJCommanderConfigs()) + .arityZeroBooleanInJCommander() + .useFile() + .build(); + javaInstanceStarter.setConfigs(args); + assertRequiredFieldsAreSet(javaInstanceStarter); + assertOptionalFieldsAreSet(javaInstanceStarter); + } + + @Test + public void provideAllConfigsUsingFileShouldSucess() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .withFileConfigs(requiredFileConfigs()) + .withFileConfigs(optionalFileConfigs()) + .useFile() + .build(); + javaInstanceStarter.setConfigs(args); + assertRequiredFieldsAreSet(javaInstanceStarter); + assertOptionalFieldsAreSet(javaInstanceStarter); + } + + + @Test + public void commandLineArgsHasHigherPriorityThanFile() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .withJCommanderConfigs(requiredJCommanderConfigs()) + .withJCommanderConfigs(optionalJCommanderConfigs()) + .arityZeroBooleanInJCommander() + .withFileConfigs(setRandomValueForConfigs(requiredFileConfigs())) + .withFileConfigs(setRandomValueForConfigs(optionalFileConfigs())) + .build(); + javaInstanceStarter.setConfigs(args); + assertRequiredFieldsAreSet(javaInstanceStarter); + assertOptionalFieldsAreSet(javaInstanceStarter); + } + + @Test + public void emptyConfigsUsingFileShouldThrowIllegalArugmentException() throws IOException { + JavaInstanceStarter javaInstanceStarter = new JavaInstanceStarter(); + String[] args = new CommandLineArgsBuilder() + .useFile() + .build(); + Assert.assertThrows(IllegalArgumentException.class, () -> javaInstanceStarter.setConfigs(args)); + } + + private void assertRequiredFieldsAreSet(JavaInstanceStarter javaInstanceStarter) { + // Required String + Assert.assertEquals(javaInstanceStarter.functionDetailsJsonString, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.functionId, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.functionVersion, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.pulsarServiceUrl, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.clusterName, STRING_INVALID_VALUE); + // Required Integer + Assert.assertEquals(javaInstanceStarter.instanceId, Integer.valueOf(INTEGER_INVALID_VALUE)); + Assert.assertEquals(javaInstanceStarter.port, Integer.valueOf(INTEGER_INVALID_VALUE)); + Assert.assertEquals(javaInstanceStarter.metricsPort, Integer.valueOf(INTEGER_INVALID_VALUE)); + Assert.assertEquals(javaInstanceStarter.maxBufferedTuples, Integer.valueOf(INTEGER_INVALID_VALUE)); + Assert.assertEquals(javaInstanceStarter.expectedHealthCheckInterval, Integer.valueOf(INTEGER_INVALID_VALUE)); + + } + + private void assertOptionalFieldsAreSet(JavaInstanceStarter javaInstanceStarter) { + // Optional String + Assert.assertEquals(javaInstanceStarter.jarFile, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.transformFunctionJarFile, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.transformFunctionId, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.clientAuthenticationPlugin, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.clientAuthenticationParameters, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.tlsTrustCertFilePath, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.stateStorageImplClass, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.stateStorageServiceUrl, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.secretsProviderClassName, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.secretsProviderConfig, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.narExtractionDirectory, STRING_INVALID_VALUE); + Assert.assertEquals(javaInstanceStarter.webServiceUrl, STRING_INVALID_VALUE); + // Optional Integer + Assert.assertEquals(javaInstanceStarter.maxPendingAsyncRequests, Integer.valueOf(INTEGER_INVALID_VALUE)); + + // Optional Boolean + Assert.assertEquals(javaInstanceStarter.useTls, Boolean.TRUE); + Assert.assertEquals(javaInstanceStarter.tlsAllowInsecureConnection, Boolean.FALSE); + Assert.assertEquals(javaInstanceStarter.tlsHostNameVerificationEnabled, Boolean.TRUE); + Assert.assertEquals(javaInstanceStarter.exposePulsarAdminClientEnabled, Boolean.TRUE); + } + + private void assertOptionalFieldHasDefaultValue(JavaInstanceStarter javaInstanceStarter) { + // Optional String + Assert.assertEquals(javaInstanceStarter.jarFile, ""); + Assert.assertEquals(javaInstanceStarter.transformFunctionJarFile, ""); + Assert.assertEquals(javaInstanceStarter.transformFunctionId, ""); + Assert.assertEquals(javaInstanceStarter.clientAuthenticationPlugin, ""); + Assert.assertEquals(javaInstanceStarter.clientAuthenticationParameters, ""); + Assert.assertEquals(javaInstanceStarter.tlsTrustCertFilePath, ""); + Assert.assertEquals(javaInstanceStarter.stateStorageImplClass, ""); + Assert.assertEquals(javaInstanceStarter.stateStorageServiceUrl, ""); + Assert.assertEquals(javaInstanceStarter.secretsProviderClassName, ""); + Assert.assertEquals(javaInstanceStarter.secretsProviderConfig, ""); + Assert.assertEquals(javaInstanceStarter.narExtractionDirectory, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR); + Assert.assertNull(javaInstanceStarter.webServiceUrl); + // Optional Integer + Assert.assertEquals(javaInstanceStarter.maxPendingAsyncRequests, Integer.valueOf(1000)); + // Optional Boolean + Assert.assertEquals(javaInstanceStarter.useTls, Boolean.FALSE); + Assert.assertEquals(javaInstanceStarter.tlsAllowInsecureConnection, Boolean.TRUE); + Assert.assertEquals(javaInstanceStarter.tlsHostNameVerificationEnabled, Boolean.FALSE); + Assert.assertEquals(javaInstanceStarter.exposePulsarAdminClientEnabled, Boolean.FALSE); + } + + private Map requiredJCommanderConfigs() { + return new HashMap<>(){{ + // String + put("--function_details", STRING_INVALID_VALUE); + put("--function_id", STRING_INVALID_VALUE); + put("--function_version", STRING_INVALID_VALUE); + put("--pulsar_serviceurl", STRING_INVALID_VALUE); + put("--cluster_name", STRING_INVALID_VALUE); + // Integer + put("--instance_id", INTEGER_INVALID_VALUE); + put("--port", INTEGER_INVALID_VALUE); + put("--metrics_port", INTEGER_INVALID_VALUE); + put("--max_buffered_tuples", INTEGER_INVALID_VALUE); + put("--expected_healthcheck_interval", INTEGER_INVALID_VALUE); + }}; + } + + private Map requiredFileConfigs() { + return new HashMap<>(){{ + // String + put("functionDetailsJsonString", STRING_INVALID_VALUE); + put("functionId", STRING_INVALID_VALUE); + put("functionVersion", STRING_INVALID_VALUE); + put("pulsarServiceUrl", STRING_INVALID_VALUE); + put("clusterName", STRING_INVALID_VALUE); + // Integer + put("instanceId", INTEGER_INVALID_VALUE); + put("port", INTEGER_INVALID_VALUE); + put("metricsPort", INTEGER_INVALID_VALUE); + put("maxBufferedTuples", INTEGER_INVALID_VALUE); + put("expectedHealthCheckInterval", INTEGER_INVALID_VALUE); + }}; + } + + private Map optionalJCommanderConfigs() { + return new HashMap<>(){{ + // String + put("--jar", STRING_INVALID_VALUE); + put("--transform_function_jar", STRING_INVALID_VALUE); + put("--transform_function_id", STRING_INVALID_VALUE); + put("--client_auth_plugin", STRING_INVALID_VALUE); + put("--client_auth_params", STRING_INVALID_VALUE); + put("--tls_trust_cert_path", STRING_INVALID_VALUE); + put("--state_storage_impl_class", STRING_INVALID_VALUE); + put("--state_storage_serviceurl", STRING_INVALID_VALUE); + put("--secrets_provider", STRING_INVALID_VALUE); + put("--secrets_provider_config", STRING_INVALID_VALUE); + put("--nar_extraction_directory", STRING_INVALID_VALUE); + put("--web_serviceurl", STRING_INVALID_VALUE); + // Integer + put("--pending_async_requests", INTEGER_INVALID_VALUE); + // Boolean + put("--use_tls", Boolean.TRUE.toString()); + put("--tls_allow_insecure", Boolean.FALSE.toString()); + put("--hostname_verification_enabled", Boolean.TRUE.toString()); + }}; + } + + private Map optionalFileConfigs() { + return new HashMap<>(){{ + // String + put("jarFile", STRING_INVALID_VALUE); + put("transformFunctionJarFile", STRING_INVALID_VALUE); + put("transformFunctionId", STRING_INVALID_VALUE); + put("clientAuthenticationPlugin", STRING_INVALID_VALUE); + put("clientAuthenticationParameters", STRING_INVALID_VALUE); + put("tlsTrustCertFilePath", STRING_INVALID_VALUE); + put("stateStorageImplClass", STRING_INVALID_VALUE); + put("stateStorageServiceUrl", STRING_INVALID_VALUE); + put("secretsProviderClassName", STRING_INVALID_VALUE); + put("secretsProviderConfig", STRING_INVALID_VALUE); + put("narExtractionDirectory", STRING_INVALID_VALUE); + put("webServiceUrl", STRING_INVALID_VALUE); + // Integer + put("maxPendingAsyncRequests", INTEGER_INVALID_VALUE); + // Boolean, value different from the default value + put("useTls", Boolean.TRUE.toString()); + put("tlsAllowInsecureConnection", Boolean.FALSE.toString()); + put("tlsHostNameVerificationEnabled", Boolean.TRUE.toString()); + put("exposePulsarAdminClientEnabled", Boolean.TRUE.toString()); + }}; + } + + @Test + public void testRequiredConfigFieldNames() { + JavaInstanceStarter jc = new JavaInstanceStarter(); + assertSetContainsSame(jc.requiredConfigFieldNames(), requiredFileConfigs().keySet()); + } + + @Test + public void testOptionalConfigFieldNames() { + JavaInstanceStarter jc = new JavaInstanceStarter(); + Set expected = Sets.newHashSet(); + expected.add("configFile"); + expected.addAll(optionalFileConfigs().keySet()); + assertSetContainsSame(jc.optionalConfigFieldNames(), expected); + } + + @Test + public void testRequiredConfigLongestArgNames() { + JavaInstanceStarter jc = new JavaInstanceStarter(); + assertSetContainsSame(jc.requiredConfigLongestArgNames(), requiredJCommanderConfigs().keySet()); + } + + @Test + public void testOptionalConfigLongestArgName() { + JavaInstanceStarter jc = new JavaInstanceStarter(); + Set expected = Sets.newHashSet(); + expected.add("--config_file"); + expected.add("--expose_pulsaradmin"); + expected.addAll(optionalJCommanderConfigs().keySet()); + assertSetContainsSame(jc.optionalConfigLongestArgNames(), expected); + } + + private void assertSetContainsSame(Set actual, Set expected) { + Assert.assertTrue(actual.containsAll(expected)); + Assert.assertTrue(expected.containsAll(actual)); + } + + private static String writeConfigsToFile(String name, Map configs) throws IOException { + File runnerConfFile = File.createTempFile(name, ".conf"); + runnerConfFile.deleteOnExit(); + try (FileWriter fileWriter = new FileWriter(runnerConfFile, Charset.defaultCharset())) { + for (Map.Entry entry : configs.entrySet()) { + fileWriter.write(entry.getKey() + "=" + entry.getValue() + System.getProperty("line.separator")); + } + } + return Paths.get(runnerConfFile.toString()).toAbsolutePath().toString(); + } + + private Map setRandomValueForConfigs(Map configs) { + configs.replaceAll((k, v) -> { + if (Objects.equals(v, INTEGER_INVALID_VALUE)) { + return String.valueOf(1); + } else if (Objects.equals(v, STRING_INVALID_VALUE)) { + return "valueFromFile"; + } else { + return "true"; + } + }); + return configs; + } + + public static class CommandLineArgsBuilder { + private Map jCommanderConfigs; + + private Map fileConfigs; + + private Boolean useFile = false; + + private Boolean arityZeroBooleanInJCommander = false; + + private CommandLineArgsBuilder(){ + this.jCommanderConfigs = new HashMap<>(); + this.fileConfigs = new HashMap<>(); + }; + + public CommandLineArgsBuilder newBuilder() { + return new CommandLineArgsBuilder(); + } + + + public CommandLineArgsBuilder withJCommanderConfigs(Map jCommanderConfigs) { + this.jCommanderConfigs.putAll(jCommanderConfigs); + return this; + } + + public CommandLineArgsBuilder withJCommanderConfigs(String key, String value) { + this.jCommanderConfigs.put(key, value); + return this; + } + + public CommandLineArgsBuilder exceptJCommanderConfigWithKey(String key) { + this.jCommanderConfigs.remove(key); + return this; + } + + public CommandLineArgsBuilder withFileConfigs(Map jCommanderConfigs) { + this.useFile = true; + this.fileConfigs.putAll(jCommanderConfigs); + return this; + } + + public CommandLineArgsBuilder withFileConfigs(String key, String value) { + this.useFile = true; + this.fileConfigs.put(key, value); + return this; + } + + public CommandLineArgsBuilder exceptFileConfigWithKey(String key) { + this.fileConfigs.remove(key); + return this; + } + + public CommandLineArgsBuilder useFile() { + this.useFile = true; + return this; + } + + public CommandLineArgsBuilder arityZeroBooleanInJCommander() { + this.arityZeroBooleanInJCommander = true; + return this; + } + + public String[] build() throws IOException { + List result = new ArrayList<>(); + for (Map.Entry entry : jCommanderConfigs.entrySet()) { + result.add(entry.getKey()); + result.add(entry.getValue()); + } + + if (arityZeroBooleanInJCommander) { + result.add("--expose_pulsaradmin"); + } + + if (useFile) { + String fileName = randomAlphabetic(7); + String completeFileName = writeConfigsToFile(fileName, fileConfigs); + result.add("--config_file"); + result.add(completeFileName); + } + return result.toArray(new String[0]); + } + } + +}