diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index f4ad66527e696..cf53e75d9d0cf 100755 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -80,7 +80,8 @@ def __init__(self, pulsar_client, secrets_provider, cluster_name, - state_storage_serviceurl): + state_storage_serviceurl, + config_file): self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) self.user_code = user_code # set queue size to one since consumers already have internal queues. Just use queue to communicate message from @@ -114,6 +115,7 @@ def __init__(self, instance_id, cluster_name, "%s/%s/%s" % (function_details.tenant, function_details.namespace, function_details.name)] self.stats = Stats(self.metrics_labels) + self.config_file = config_file def health_check(self): self.last_health_check_ts = time.time() diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 3967635365c38..dc03b0d4a4773 100755 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -54,6 +54,50 @@ def atexit_function(signo, _frame): Log.info("Interrupted by %d, shutting down" % signo) to_run = False +def merge_arguments(args, config_file): + """ + This function is used to merge arguments passed in via the command line + and those passed in via the configuration file during initialization. + + :param args: arguments passed in via the command line + :param config_file: configuration file name (path) + + During the merge process, the arguments passed in via the command line have higher priority, + so only optional arguments need to be merged. + """ + config = util.read_config(config_file) + if not config: + return + default_config = config["DEFAULT"] + if not default_config: + return + if not args.client_auth_plugin and default_config.get("client_auth_plugin", None): + args.client_auth_plugin = default_config.get("client_auth_plugin") + if not args.client_auth_params and default_config.get("client_auth_params", None): + args.client_auth_params = default_config.get("client_auth_params") + if not args.use_tls and default_config.get("use_tls", None): + args.use_tls = default_config.get("use_tls") + if not args.tls_allow_insecure_connection and default_config.get("tls_allow_insecure_connection", None): + args.tls_allow_insecure_connection = default_config.get("tls_allow_insecure_connection") + if not args.hostname_verification_enabled and default_config.get("hostname_verification_enabled", None): + args.hostname_verification_enabled = default_config.get("hostname_verification_enabled") + if not args.tls_trust_cert_path and default_config.get("tls_trust_cert_path", None): + args.tls_trust_cert_path = default_config.get("tls_trust_cert_path") + if not args.logging_level and default_config.get("logging_level", None): + args.logging_level = default_config.get("logging_level") + if not args.secrets_provider and default_config.get("secrets_provider", None): + args.secrets_provider = default_config.get("secrets_provider") + if not args.secrets_provider_config and default_config.get("secrets_provider_config", None): + args.secrets_provider_config = default_config.get("secrets_provider_config") + if not args.install_usercode_dependencies and default_config.get("install_usercode_dependencies", None): + args.install_usercode_dependencies = default_config.get("install_usercode_dependencies") + if not args.dependency_repository and default_config.get("dependency_repository", None): + args.dependency_repository = default_config.get("dependency_repository") + if not args.extra_dependency_repository and default_config.get("extra_dependency_repository", None): + args.extra_dependency_repository = default_config.get("extra_dependency_repository") + if not args.state_storage_serviceurl and default_config.get("state_storage_serviceurl", None): + args.state_storage_serviceurl = default_config.get("state_storage_serviceurl") + def main(): # Setup signal handlers signal.signal(signal.SIGTERM, atexit_function) @@ -88,8 +132,10 @@ def main(): parser.add_argument('--extra_dependency_repository', required=False, help='For packaged python like wheel files, any extra repository to pull the dependencies from') parser.add_argument('--state_storage_serviceurl', required=False, help='Managed State Storage Service Url') parser.add_argument('--cluster_name', required=True, help='The name of the cluster this instance is running on') + parser.add_argument('--config_file', required=False, default="", help='Configuration file name', type=str) args = parser.parse_args() + merge_arguments(args, args.config_file) function_details = Function_pb2.FunctionDetails() args.function_details = str(args.function_details) if args.function_details[0] == '\'': @@ -216,7 +262,8 @@ def main(): pulsar_client, secrets_provider, args.cluster_name, - state_storage_serviceurl) + state_storage_serviceurl, + args.config_file) pyinstance.run() server_instance = server.serve(args.port, pyinstance) diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 48ba2f0e6d7cc..f5868093faea4 100755 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -25,6 +25,8 @@ import inspect import sys import importlib +import configparser + from threading import Timer from pulsar.functions import serde @@ -80,6 +82,23 @@ def getFullyQualifiedInstanceId(tenant, namespace, name, instance_id): def get_properties(fullyQualifiedName, instanceId): return {"application": "pulsar-function", "id": str(fullyQualifiedName), "instance_id": str(instanceId)} +def read_config(config_file): + """ + The content of the configuration file is styled as follows: + + [DEFAULT] + parameter1 = value1 + parameter2 = value2 + parameter3 = value3 + ... + """ + if config_file == "": + return None + + cfg = configparser.ConfigParser() + cfg.read(config_file) + return cfg + class FixedTimer(): def __init__(self, t, hFunction, name="timer-thread"): diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index f9eecf2872d25..da05fcb189364 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -223,6 +223,15 @@ public static void main(String[] args) throws Exception { arguments.authPluginClassName = arguments.deprecatedAuthPluginClassName; } + for (String arg : arguments.topic) { + if (arg.startsWith("-")) { + System.out.printf("invalid option: '%s'\nTo use a topic with the name '%s', " + + "please use a fully qualified topic name\n", arg, arg); + jc.usage(); + PerfClientUtils.exit(-1); + } + } + if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) { // keep compatibility with the previous version if (arguments.topic.size() == 1) { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index 3ff78909a687f..6600e3083e0ea 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -292,6 +292,15 @@ public static void main(String[] args) throws Exception { arguments.authPluginClassName = arguments.deprecatedAuthPluginClassName; } + for (String arg : arguments.topics) { + if (arg.startsWith("-")) { + System.out.printf("invalid option: '%s'\nTo use a topic with the name '%s', " + + "please use a fully qualified topic name\n", arg, arg); + jc.usage(); + PerfClientUtils.exit(-1); + } + } + if (arguments.topics != null && arguments.topics.size() != arguments.numTopics) { // keep compatibility with the previous version if (arguments.topics.size() == 1) { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java index cc2d6315eeb1a..6adb0a9ca448e 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java @@ -122,6 +122,15 @@ public static void main(String[] args) throws Exception { PerfClientUtils.exit(-1); } + for (String arg : arguments.topic) { + if (arg.startsWith("-")) { + System.out.printf("invalid option: '%s'\nTo use a topic with the name '%s', " + + "please use a fully qualified topic name\n", arg, arg); + jc.usage(); + PerfClientUtils.exit(-1); + } + } + if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) { // keep compatibility with the previous version if (arguments.topic.size() == 1) { diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index fbb14fa86f3de..53a515ff99164 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -127,9 +127,9 @@ public void replayComplete() { + tcID.toString() + " change state to Ready error when init it")); } else { + completableFuture.complete(MLTransactionMetadataStore.this); recoverTracker.handleCommittingAndAbortingTransaction(); timeoutTracker.start(); - completableFuture.complete(MLTransactionMetadataStore.this); recoverTime.setRecoverEndTime(System.currentTimeMillis()); } }