From 6506f9bfd4896de489d29fa7ec46bd11d9e6d94e Mon Sep 17 00:00:00 2001 From: laminar Date: Wed, 8 Feb 2023 09:17:01 +0800 Subject: [PATCH] [improve][fn] support reading config options from file in Function Python Runner (#18951) Signed-off-by: laminar --- .../src/main/python/python_instance.py | 4 +- .../src/main/python/python_instance_main.py | 107 ++++++++++++++---- .../instance/src/main/python/util.py | 19 ++++ .../test/python/test_python_instance_main.py | 69 +++++++++++ .../python/test_python_runtime_config.ini | 27 +++++ 5 files changed, 205 insertions(+), 21 deletions(-) create mode 100644 pulsar-functions/instance/src/test/python/test_python_instance_main.py create mode 100644 pulsar-functions/instance/src/test/python/test_python_runtime_config.ini diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index f4ad66527e696..cf53e75d9d0cf 100755 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -80,7 +80,8 @@ def __init__(self, pulsar_client, secrets_provider, cluster_name, - state_storage_serviceurl): + state_storage_serviceurl, + config_file): self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) self.user_code = user_code # set queue size to one since consumers already have internal queues. Just use queue to communicate message from @@ -114,6 +115,7 @@ def __init__(self, instance_id, cluster_name, "%s/%s/%s" % (function_details.tenant, function_details.namespace, function_details.name)] self.stats = Stats(self.metrics_labels) + self.config_file = config_file def health_check(self): self.last_health_check_ts = time.time() diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 3967635365c38..2d6520b2e99e9 100755 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -49,47 +49,113 @@ to_run = True Log = log.Log + def atexit_function(signo, _frame): global to_run Log.info("Interrupted by %d, shutting down" % signo) to_run = False -def main(): - # Setup signal handlers - signal.signal(signal.SIGTERM, atexit_function) - signal.signal(signal.SIGHUP, atexit_function) - signal.signal(signal.SIGINT, atexit_function) +def generate_arguments_parser(): parser = argparse.ArgumentParser(description='Pulsar Functions Python Instance') - parser.add_argument('--function_details', required=True, help='Function Details Json String') - parser.add_argument('--py', required=True, help='Full Path of Function Code File') - parser.add_argument('--instance_id', required=True, help='Instance Id') - parser.add_argument('--function_id', required=True, help='Function Id') - parser.add_argument('--function_version', required=True, help='Function Version') - parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar Service Url') + parser.add_argument('--function_details', required=False, help='Function Details Json String') + parser.add_argument('--py', required=False, help='Full Path of Function Code File') + parser.add_argument('--instance_id', required=False, help='Instance Id') + parser.add_argument('--function_id', required=False, help='Function Id') + parser.add_argument('--function_version', required=False, help='Function Version') + parser.add_argument('--pulsar_serviceurl', required=False, help='Pulsar Service Url') parser.add_argument('--client_auth_plugin', required=False, help='Client authentication plugin') parser.add_argument('--client_auth_params', required=False, help='Client authentication params') parser.add_argument('--use_tls', required=False, help='Use tls') parser.add_argument('--tls_allow_insecure_connection', required=False, help='Tls allow insecure connection') parser.add_argument('--hostname_verification_enabled', required=False, help='Enable hostname verification') parser.add_argument('--tls_trust_cert_path', required=False, help='Tls trust cert file path') - parser.add_argument('--port', required=True, help='Instance Port', type=int) - parser.add_argument('--metrics_port', required=True, help="Port metrics will be exposed on", type=int) - parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples') - parser.add_argument('--logging_directory', required=True, help='Logging Directory') - parser.add_argument('--logging_file', required=True, help='Log file name') + parser.add_argument('--port', required=False, help='Instance Port', type=int) + parser.add_argument('--metrics_port', required=False, help="Port metrics will be exposed on", type=int) + parser.add_argument('--max_buffered_tuples', required=False, help='Maximum number of Buffered tuples') + parser.add_argument('--logging_directory', required=False, help='Logging Directory') + parser.add_argument('--logging_file', required=False, help='Log file name') parser.add_argument('--logging_level', required=False, help='Logging level') - parser.add_argument('--logging_config_file', required=True, help='Config file for logging') - parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int) + parser.add_argument('--logging_config_file', required=False, help='Config file for logging') + parser.add_argument('--expected_healthcheck_interval', required=False, help='Expected time in seconds between health checks', type=int) parser.add_argument('--secrets_provider', required=False, help='The classname of the secrets provider') parser.add_argument('--secrets_provider_config', required=False, help='The config that needs to be passed to secrets provider') parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool) parser.add_argument('--dependency_repository', required=False, help='For packaged python like wheel files, which repository to pull the dependencies from') parser.add_argument('--extra_dependency_repository', required=False, help='For packaged python like wheel files, any extra repository to pull the dependencies from') parser.add_argument('--state_storage_serviceurl', required=False, help='Managed State Storage Service Url') - parser.add_argument('--cluster_name', required=True, help='The name of the cluster this instance is running on') + parser.add_argument('--cluster_name', required=False, help='The name of the cluster this instance is running on') + parser.add_argument('--config_file', required=False, default="", help='Configuration file name', type=str) + return parser + +def merge_arguments(args, config_file): + """ + This function is used to merge arguments passed in via the command line + and those passed in via the configuration file during initialization. + + :param args: arguments passed in via the command line + :param config_file: configuration file name (path) + + During the merge process, the arguments passed in via the command line have higher priority, + so only optional arguments need to be merged. + """ + if config_file is None: + return + config = util.read_config(config_file) + if not config: + return + default_config = config["DEFAULT"] + if not default_config: + return + for k, v in vars(args).items(): + if k == "config_file": + continue + if not v and default_config.get(k, None): + vars(args)[k] = default_config.get(k) + + +def validate_arguments(args): + """ + This function is used to verify the merged arguments, + mainly to check whether the mandatory arguments are assigned properly. + + :param args: arguments after merging + """ + mandatory_args_map = { + "function_details": args.function_details, + "py": args.py, + "instance_id": args.instance_id, + "function_id": args.function_id, + "function_version": args.function_version, + "pulsar_serviceurl": args.pulsar_serviceurl, + "port": args.port, + "metrics_port": args.metrics_port, + "max_buffered_tuples": args.max_buffered_tuples, + "logging_directory": args.logging_directory, + "logging_file": args.logging_file, + "logging_config_file": args.logging_config_file, + "expected_healthcheck_interval": args.expected_healthcheck_interval, + "cluster_name": args.cluster_name + } + missing_args = [] + for k, v in mandatory_args_map.items(): + if v is None: + missing_args.append(k) + if missing_args: + print("The following arguments are required:", missing_args) + sys.exit(1) + + +def main(): + # Setup signal handlers + signal.signal(signal.SIGTERM, atexit_function) + signal.signal(signal.SIGHUP, atexit_function) + signal.signal(signal.SIGINT, atexit_function) + parser = generate_arguments_parser() args = parser.parse_args() + merge_arguments(args, args.config_file) + validate_arguments(args) function_details = Function_pb2.FunctionDetails() args.function_details = str(args.function_details) if args.function_details[0] == '\'': @@ -216,7 +282,8 @@ def main(): pulsar_client, secrets_provider, args.cluster_name, - state_storage_serviceurl) + state_storage_serviceurl, + args.config_file) pyinstance.run() server_instance = server.serve(args.port, pyinstance) diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 48ba2f0e6d7cc..f5868093faea4 100755 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -25,6 +25,8 @@ import inspect import sys import importlib +import configparser + from threading import Timer from pulsar.functions import serde @@ -80,6 +82,23 @@ def getFullyQualifiedInstanceId(tenant, namespace, name, instance_id): def get_properties(fullyQualifiedName, instanceId): return {"application": "pulsar-function", "id": str(fullyQualifiedName), "instance_id": str(instanceId)} +def read_config(config_file): + """ + The content of the configuration file is styled as follows: + + [DEFAULT] + parameter1 = value1 + parameter2 = value2 + parameter3 = value3 + ... + """ + if config_file == "": + return None + + cfg = configparser.ConfigParser() + cfg.read(config_file) + return cfg + class FixedTimer(): def __init__(self, t, hFunction, name="timer-thread"): diff --git a/pulsar-functions/instance/src/test/python/test_python_instance_main.py b/pulsar-functions/instance/src/test/python/test_python_instance_main.py new file mode 100644 index 0000000000000..af248691919a3 --- /dev/null +++ b/pulsar-functions/instance/src/test/python/test_python_instance_main.py @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# DEPENDENCIES: unittest2 +import python_instance_main + +import os +import log +import unittest + +class TestContextImpl(unittest.TestCase): + + def Any(cls): + class Any(cls): + def __eq__(self, other): + return True + return Any() + + def setUp(self): + log.init_logger("INFO", "foo", os.environ.get("PULSAR_HOME") + "/conf/functions-logging/console_logging_config.ini") + + def test_arguments(self): + parser = python_instance_main.generate_arguments_parser() + argv = [ + "--function_details", "test_function_details", + "--py", "test_py", + "--instance_id", "test_instance_id", + "--function_id", "test_function_id", + "--function_version", "test_function_version", + "--pulsar_serviceurl", "test_pulsar_serviceurl", + "--client_auth_plugin", "test_client_auth_plugin", + "--client_auth_params", "test_client_auth_params", + "--tls_allow_insecure_connection", "true", + "--hostname_verification_enabled", "true", + "--tls_trust_cert_path", "test_tls_trust_cert_path", + "--port", "1000", + "--metrics_port", "1001", + "--max_buffered_tuples", "100", + "--config_file", "test_python_runtime_config.ini" + ] + args = parser.parse_args(argv) + python_instance_main.merge_arguments(args, args.config_file) + # argument from command line test + self.assertEqual(args.function_details, "test_function_details") + # argument from config file test + self.assertEqual(args.use_tls, "true") + # argument read priority test + self.assertEqual(args.port, 1000) + # mandatory argument test + self.assertEqual(args.expected_healthcheck_interval, "50") + # optional argument test + self.assertEqual(args.secrets_provider, None) diff --git a/pulsar-functions/instance/src/test/python/test_python_runtime_config.ini b/pulsar-functions/instance/src/test/python/test_python_runtime_config.ini new file mode 100644 index 0000000000000..8e17264717883 --- /dev/null +++ b/pulsar-functions/instance/src/test/python/test_python_runtime_config.ini @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +[DEFAULT] +port=5000 +metrics_port=5001 +use_tls=true +logging_directory=test_logging_directory +logging_file=test_logging_file +logging_config_file=test_logging_config_file +expected_healthcheck_interval=50 \ No newline at end of file