From b1f9e351fa4d5aba197d33cfc0c536516b55b61f Mon Sep 17 00:00:00 2001 From: Paul Gier Date: Thu, 15 Dec 2022 11:05:01 -0600 Subject: [PATCH 1/7] [improve][cli] pulsar-perf: check for invalid CLI options (#18889) --- .../apache/pulsar/testclient/PerformanceConsumer.java | 9 +++++++++ .../apache/pulsar/testclient/PerformanceProducer.java | 9 +++++++++ .../org/apache/pulsar/testclient/PerformanceReader.java | 9 +++++++++ 3 files changed, 27 insertions(+) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index f9eecf2872d25..da05fcb189364 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -223,6 +223,15 @@ public static void main(String[] args) throws Exception { arguments.authPluginClassName = arguments.deprecatedAuthPluginClassName; } + for (String arg : arguments.topic) { + if (arg.startsWith("-")) { + System.out.printf("invalid option: '%s'\nTo use a topic with the name '%s', " + + "please use a fully qualified topic name\n", arg, arg); + jc.usage(); + PerfClientUtils.exit(-1); + } + } + if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) { // keep compatibility with the previous version if (arguments.topic.size() == 1) { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index 3ff78909a687f..6600e3083e0ea 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -292,6 +292,15 @@ public static void main(String[] args) throws Exception { arguments.authPluginClassName = arguments.deprecatedAuthPluginClassName; } + for (String arg : arguments.topics) { + if (arg.startsWith("-")) { + System.out.printf("invalid option: '%s'\nTo use a topic with the name '%s', " + + "please use a fully qualified topic name\n", arg, arg); + jc.usage(); + PerfClientUtils.exit(-1); + } + } + if (arguments.topics != null && arguments.topics.size() != arguments.numTopics) { // keep compatibility with the previous version if (arguments.topics.size() == 1) { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java index cc2d6315eeb1a..6adb0a9ca448e 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java @@ -122,6 +122,15 @@ public static void main(String[] args) throws Exception { PerfClientUtils.exit(-1); } + for (String arg : arguments.topic) { + if (arg.startsWith("-")) { + System.out.printf("invalid option: '%s'\nTo use a topic with the name '%s', " + + "please use a fully qualified topic name\n", arg, arg); + jc.usage(); + PerfClientUtils.exit(-1); + } + } + if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) { // keep compatibility with the previous version if (arguments.topic.size() == 1) { From a35670d83b0b3f2fb63d11e4fb222d7f24099c9a Mon Sep 17 00:00:00 2001 From: ken <1647023764@qq.com> Date: Fri, 16 Dec 2022 10:56:55 +0800 Subject: [PATCH 2/7] [improve][txn] fix error in recoverTracker.handleCommittingAndAbortingTransaction() (#18924) Fixes #18923 ### Motivation As described in #18923, recoverTracker.handleCommittingAndAbortingTransaction() fail when TC recover. ### Modifications when transactionLog.replayAsync() finish, complete TC future. --- .../coordinator/impl/MLTransactionMetadataStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index fbb14fa86f3de..53a515ff99164 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -127,9 +127,9 @@ public void replayComplete() { + tcID.toString() + " change state to Ready error when init it")); } else { + completableFuture.complete(MLTransactionMetadataStore.this); recoverTracker.handleCommittingAndAbortingTransaction(); timeoutTracker.start(); - completableFuture.complete(MLTransactionMetadataStore.this); recoverTime.setRecoverEndTime(System.currentTimeMillis()); } } From 11447b7ca0bde6e7f14a8722a2e6b23666fbccfa Mon Sep 17 00:00:00 2001 From: laminar Date: Sun, 27 Nov 2022 17:36:24 +0800 Subject: [PATCH 3/7] Add a mechanism for python functions runner to live update the runtime configuration from the configuration file Signed-off-by: laminar --- .../src/main/python/python_instance.py | 39 ++++++++++++++- .../src/main/python/python_instance_main.py | 50 ++++++++++++++++++- .../instance/src/main/python/util.py | 18 +++++++ 3 files changed, 105 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index f4ad66527e696..b41bd000ce19b 100755 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -46,6 +46,10 @@ from collections import namedtuple from function_stats import Stats +from watchdog.observers import Observer +from watchdog.events import PatternMatchingEventHandler +from google.protobuf import json_format + Log = log.Log # Equivalent of the InstanceConfig in Java InstanceConfig = namedtuple('InstanceConfig', 'instance_id function_id function_version function_details max_buffered_tuples') @@ -80,7 +84,9 @@ def __init__(self, pulsar_client, secrets_provider, cluster_name, - state_storage_serviceurl): + state_storage_serviceurl, + config_file, + enable_live_update): self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) self.user_code = user_code # set queue size to one since consumers already have internal queues. Just use queue to communicate message from @@ -114,6 +120,9 @@ def __init__(self, instance_id, cluster_name, "%s/%s/%s" % (function_details.tenant, function_details.namespace, function_details.name)] self.stats = Stats(self.metrics_labels) + self.config_file = config_file + self.enable_live_update = enable_live_update + self.config_file_observer_thread = None def health_check(self): self.last_health_check_ts = time.time() @@ -216,6 +225,11 @@ def run(self): self.execution_thread = threading.Thread(target=self.actual_execution) self.execution_thread.start() + # Launch a thread to observe the configuration file if required + if self.enable_live_update: + self.config_file_observer_thread = threading.Thread(target=self.setup_config_observer) + self.config_file_observer_thread.start() + # start proccess spawner health check timer self.last_health_check_ts = time.time() if self.expected_healthcheck_interval > 0: @@ -441,9 +455,32 @@ def get_function_status(self): status.lastInvocationTime = int(last_invocation) if sys.version_info.major >= 3 else long(last_invocation) return status + def config_observer_on_modified(self, event): + # TODO: This function is used for runtime parameters handling logic + # when listening to changes in the configuration file (self.config_file) + pass + + def setup_config_observer(self): + Log.debug("Started Thread for observing the configuration file") + event_handler = PatternMatchingEventHandler(patterns="*", ignore_patterns="", ignore_directories=False, + case_sensitive=True) + event_handler.on_modified = self.config_observer_on_modified + observer = Observer() + observer.schedule(event_handler, self.config_file, recursive=True) + observer.start() + try: + while True: + pass + except Exception as e: + Log.error("Uncaught exception in Python instance (config observer): %s" % e); + observer.stop() + observer.join() + def join(self): self.queue.put(InternalQuitMessage(True), True) self.execution_thread.join() + if self.config_file_observer_thread: + self.config_file_observer_thread.join() self.close() def close(self): diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 3967635365c38..afa8439bdcce7 100755 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -54,6 +54,49 @@ def atexit_function(signo, _frame): Log.info("Interrupted by %d, shutting down" % signo) to_run = False +def merge_arguments(args, config_file): + """ + This function is used to merge arguments passed in via the command line + and those passed in via the configuration file during initialization. + + :param args: arguments passed in via the command line + :param config_file: configuration file name (path) + + During the merge process, the arguments passed in via the command line have higher priority, + so only optional arguments need to be merged. + """ + config = util.read_config(config_file) + if not config: + return + if not args.client_auth_plugin and config.get("client_auth_plugin", None): + args.client_auth_plugin = config.get("client_auth_plugin") + if not args.client_auth_params and config.get("client_auth_params", None): + args.client_auth_params = config.get("client_auth_params") + if not args.use_tls and config.get("use_tls", None): + args.use_tls = config.get("use_tls") + if not args.tls_allow_insecure_connection and config.get("tls_allow_insecure_connection", None): + args.tls_allow_insecure_connection = config.get("tls_allow_insecure_connection") + if not args.hostname_verification_enabled and config.get("hostname_verification_enabled", None): + args.hostname_verification_enabled = config.get("hostname_verification_enabled") + if not args.tls_trust_cert_path and config.get("tls_trust_cert_path", None): + args.tls_trust_cert_path = config.get("tls_trust_cert_path") + if not args.logging_level and config.get("logging_level", None): + args.logging_level = config.get("logging_level") + if not args.secrets_provider and config.get("secrets_provider", None): + args.secrets_provider = config.get("secrets_provider") + if not args.secrets_provider_config and config.get("secrets_provider_config", None): + args.secrets_provider_config = config.get("secrets_provider_config") + if not args.install_usercode_dependencies and config.get("install_usercode_dependencies", None): + args.install_usercode_dependencies = config.get("install_usercode_dependencies") + if not args.dependency_repository and config.get("dependency_repository", None): + args.dependency_repository = config.get("dependency_repository") + if not args.extra_dependency_repository and config.get("extra_dependency_repository", None): + args.extra_dependency_repository = config.get("extra_dependency_repository") + if not args.state_storage_serviceurl and config.get("state_storage_serviceurl", None): + args.state_storage_serviceurl = config.get("state_storage_serviceurl") + if not args.enable_live_update and config.get("enable_live_update", None): + args.enable_live_update = config.get("enable_live_update") + def main(): # Setup signal handlers signal.signal(signal.SIGTERM, atexit_function) @@ -88,8 +131,11 @@ def main(): parser.add_argument('--extra_dependency_repository', required=False, help='For packaged python like wheel files, any extra repository to pull the dependencies from') parser.add_argument('--state_storage_serviceurl', required=False, help='Managed State Storage Service Url') parser.add_argument('--cluster_name', required=True, help='The name of the cluster this instance is running on') + parser.add_argument('--config_file', required=False, default="", help='Configuration file name', type=str) + parser.add_argument('--enable_live_update', required=False, default=False, help='Configuration file name', type=bool) args = parser.parse_args() + merge_arguments(args, args.config_file) function_details = Function_pb2.FunctionDetails() args.function_details = str(args.function_details) if args.function_details[0] == '\'': @@ -216,7 +262,9 @@ def main(): pulsar_client, secrets_provider, args.cluster_name, - state_storage_serviceurl) + state_storage_serviceurl, + args.config_file, + args.enable_live_update) pyinstance.run() server_instance = server.serve(args.port, pyinstance) diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 48ba2f0e6d7cc..9fb977d7db2dc 100755 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -25,6 +25,7 @@ import inspect import sys import importlib +import configparser from threading import Timer from pulsar.functions import serde @@ -80,6 +81,23 @@ def getFullyQualifiedInstanceId(tenant, namespace, name, instance_id): def get_properties(fullyQualifiedName, instanceId): return {"application": "pulsar-function", "id": str(fullyQualifiedName), "instance_id": str(instanceId)} +def read_config(config_file): + """ + The content of the configuration file is styled as follows: + + [DEFAULT] + parameter1 = value1 + parameter2 = value2 + parameter3 = value3 + ... + """ + if config_file == "": + return None + + cfg = configparser.ConfigParser() + cfg.read(config_file) + return cfg.get("DEFAULT", None) + class FixedTimer(): def __init__(self, t, hFunction, name="timer-thread"): From 2e1f020c8dfb2cf596b3fe05effc97e1a8735dfc Mon Sep 17 00:00:00 2001 From: laminar Date: Mon, 28 Nov 2022 09:26:19 +0800 Subject: [PATCH 4/7] install watchdog Signed-off-by: laminar --- docker/pulsar/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/pulsar/Dockerfile b/docker/pulsar/Dockerfile index 2194fc03cb233..b2ed47de360cd 100644 --- a/docker/pulsar/Dockerfile +++ b/docker/pulsar/Dockerfile @@ -79,7 +79,7 @@ RUN apt-get -y --purge autoremove \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* -RUN pip3 install pyyaml==5.4.1 +RUN pip3 install pyyaml==5.4.1 watchdog==2.1.9 # Pulsar currently writes to the below directories, assuming the default configuration. # Note that number 4 is the reason that pulsar components need write access to the /pulsar directory. From 307a6560a9e3bc55d4f9f1daa97a9500ea71f4a7 Mon Sep 17 00:00:00 2001 From: laminar Date: Tue, 29 Nov 2022 15:09:07 +0800 Subject: [PATCH 5/7] adjust `util.read_config()` Signed-off-by: laminar --- .../src/main/python/python_instance.py | 4 +- .../src/main/python/python_instance_main.py | 59 ++++++++++--------- .../instance/src/main/python/util.py | 3 +- 3 files changed, 35 insertions(+), 31 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index b41bd000ce19b..50b0ceb5302aa 100755 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -48,7 +48,6 @@ from watchdog.observers import Observer from watchdog.events import PatternMatchingEventHandler -from google.protobuf import json_format Log = log.Log # Equivalent of the InstanceConfig in Java @@ -458,6 +457,7 @@ def get_function_status(self): def config_observer_on_modified(self, event): # TODO: This function is used for runtime parameters handling logic # when listening to changes in the configuration file (self.config_file) + Log.debug("Configuration file %s modified detected" % self.config_file) pass def setup_config_observer(self): @@ -472,7 +472,7 @@ def setup_config_observer(self): while True: pass except Exception as e: - Log.error("Uncaught exception in Python instance (config observer): %s" % e); + Log.error("Uncaught exception in Python instance (config observer): %s" % e) observer.stop() observer.join() diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index afa8439bdcce7..0c9342432d8a4 100755 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -68,34 +68,37 @@ def merge_arguments(args, config_file): config = util.read_config(config_file) if not config: return - if not args.client_auth_plugin and config.get("client_auth_plugin", None): - args.client_auth_plugin = config.get("client_auth_plugin") - if not args.client_auth_params and config.get("client_auth_params", None): - args.client_auth_params = config.get("client_auth_params") - if not args.use_tls and config.get("use_tls", None): - args.use_tls = config.get("use_tls") - if not args.tls_allow_insecure_connection and config.get("tls_allow_insecure_connection", None): - args.tls_allow_insecure_connection = config.get("tls_allow_insecure_connection") - if not args.hostname_verification_enabled and config.get("hostname_verification_enabled", None): - args.hostname_verification_enabled = config.get("hostname_verification_enabled") - if not args.tls_trust_cert_path and config.get("tls_trust_cert_path", None): - args.tls_trust_cert_path = config.get("tls_trust_cert_path") - if not args.logging_level and config.get("logging_level", None): - args.logging_level = config.get("logging_level") - if not args.secrets_provider and config.get("secrets_provider", None): - args.secrets_provider = config.get("secrets_provider") - if not args.secrets_provider_config and config.get("secrets_provider_config", None): - args.secrets_provider_config = config.get("secrets_provider_config") - if not args.install_usercode_dependencies and config.get("install_usercode_dependencies", None): - args.install_usercode_dependencies = config.get("install_usercode_dependencies") - if not args.dependency_repository and config.get("dependency_repository", None): - args.dependency_repository = config.get("dependency_repository") - if not args.extra_dependency_repository and config.get("extra_dependency_repository", None): - args.extra_dependency_repository = config.get("extra_dependency_repository") - if not args.state_storage_serviceurl and config.get("state_storage_serviceurl", None): - args.state_storage_serviceurl = config.get("state_storage_serviceurl") - if not args.enable_live_update and config.get("enable_live_update", None): - args.enable_live_update = config.get("enable_live_update") + default_config = config["DEFAULT"] + if not default_config: + return + if not args.client_auth_plugin and default_config.get("client_auth_plugin", None): + args.client_auth_plugin = default_config.get("client_auth_plugin") + if not args.client_auth_params and default_config.get("client_auth_params", None): + args.client_auth_params = default_config.get("client_auth_params") + if not args.use_tls and default_config.get("use_tls", None): + args.use_tls = default_config.get("use_tls") + if not args.tls_allow_insecure_connection and default_config.get("tls_allow_insecure_connection", None): + args.tls_allow_insecure_connection = default_config.get("tls_allow_insecure_connection") + if not args.hostname_verification_enabled and default_config.get("hostname_verification_enabled", None): + args.hostname_verification_enabled = default_config.get("hostname_verification_enabled") + if not args.tls_trust_cert_path and default_config.get("tls_trust_cert_path", None): + args.tls_trust_cert_path = default_config.get("tls_trust_cert_path") + if not args.logging_level and default_config.get("logging_level", None): + args.logging_level = default_config.get("logging_level") + if not args.secrets_provider and default_config.get("secrets_provider", None): + args.secrets_provider = default_config.get("secrets_provider") + if not args.secrets_provider_config and default_config.get("secrets_provider_config", None): + args.secrets_provider_config = default_config.get("secrets_provider_config") + if not args.install_usercode_dependencies and default_config.get("install_usercode_dependencies", None): + args.install_usercode_dependencies = default_config.get("install_usercode_dependencies") + if not args.dependency_repository and default_config.get("dependency_repository", None): + args.dependency_repository = default_config.get("dependency_repository") + if not args.extra_dependency_repository and default_config.get("extra_dependency_repository", None): + args.extra_dependency_repository = default_config.get("extra_dependency_repository") + if not args.state_storage_serviceurl and default_config.get("state_storage_serviceurl", None): + args.state_storage_serviceurl = default_config.get("state_storage_serviceurl") + if not args.enable_live_update and default_config.get("enable_live_update", None): + args.enable_live_update = default_config.get("enable_live_update") def main(): # Setup signal handlers diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 9fb977d7db2dc..f5868093faea4 100755 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -26,6 +26,7 @@ import sys import importlib import configparser + from threading import Timer from pulsar.functions import serde @@ -96,7 +97,7 @@ def read_config(config_file): cfg = configparser.ConfigParser() cfg.read(config_file) - return cfg.get("DEFAULT", None) + return cfg class FixedTimer(): From 83e133012f8d9ee09352b4db7472c84521315f1c Mon Sep 17 00:00:00 2001 From: laminar Date: Wed, 30 Nov 2022 13:37:36 +0800 Subject: [PATCH 6/7] fix codacy Signed-off-by: laminar --- pulsar-functions/instance/src/main/python/python_instance.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 50b0ceb5302aa..78302ebebd813 100755 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -457,8 +457,7 @@ def get_function_status(self): def config_observer_on_modified(self, event): # TODO: This function is used for runtime parameters handling logic # when listening to changes in the configuration file (self.config_file) - Log.debug("Configuration file %s modified detected" % self.config_file) - pass + Log.debug("Configuration file %s modified detected", self.config_file) def setup_config_observer(self): Log.debug("Started Thread for observing the configuration file") @@ -472,7 +471,7 @@ def setup_config_observer(self): while True: pass except Exception as e: - Log.error("Uncaught exception in Python instance (config observer): %s" % e) + Log.error("Uncaught exception in Python instance (config observer): %s", e) observer.stop() observer.join() From e92efe81273de8f8187786b9529d49471246ba51 Mon Sep 17 00:00:00 2001 From: laminar Date: Mon, 12 Dec 2022 16:06:39 +0800 Subject: [PATCH 7/7] revert the live_update mechanism Signed-off-by: laminar --- docker/pulsar/Dockerfile | 2 +- .../src/main/python/python_instance.py | 36 +------------------ .../src/main/python/python_instance_main.py | 6 +--- 3 files changed, 3 insertions(+), 41 deletions(-) diff --git a/docker/pulsar/Dockerfile b/docker/pulsar/Dockerfile index b2ed47de360cd..2194fc03cb233 100644 --- a/docker/pulsar/Dockerfile +++ b/docker/pulsar/Dockerfile @@ -79,7 +79,7 @@ RUN apt-get -y --purge autoremove \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* -RUN pip3 install pyyaml==5.4.1 watchdog==2.1.9 +RUN pip3 install pyyaml==5.4.1 # Pulsar currently writes to the below directories, assuming the default configuration. # Note that number 4 is the reason that pulsar components need write access to the /pulsar directory. diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 78302ebebd813..cf53e75d9d0cf 100755 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -46,9 +46,6 @@ from collections import namedtuple from function_stats import Stats -from watchdog.observers import Observer -from watchdog.events import PatternMatchingEventHandler - Log = log.Log # Equivalent of the InstanceConfig in Java InstanceConfig = namedtuple('InstanceConfig', 'instance_id function_id function_version function_details max_buffered_tuples') @@ -84,8 +81,7 @@ def __init__(self, secrets_provider, cluster_name, state_storage_serviceurl, - config_file, - enable_live_update): + config_file): self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) self.user_code = user_code # set queue size to one since consumers already have internal queues. Just use queue to communicate message from @@ -120,8 +116,6 @@ def __init__(self, "%s/%s/%s" % (function_details.tenant, function_details.namespace, function_details.name)] self.stats = Stats(self.metrics_labels) self.config_file = config_file - self.enable_live_update = enable_live_update - self.config_file_observer_thread = None def health_check(self): self.last_health_check_ts = time.time() @@ -224,11 +218,6 @@ def run(self): self.execution_thread = threading.Thread(target=self.actual_execution) self.execution_thread.start() - # Launch a thread to observe the configuration file if required - if self.enable_live_update: - self.config_file_observer_thread = threading.Thread(target=self.setup_config_observer) - self.config_file_observer_thread.start() - # start proccess spawner health check timer self.last_health_check_ts = time.time() if self.expected_healthcheck_interval > 0: @@ -454,32 +443,9 @@ def get_function_status(self): status.lastInvocationTime = int(last_invocation) if sys.version_info.major >= 3 else long(last_invocation) return status - def config_observer_on_modified(self, event): - # TODO: This function is used for runtime parameters handling logic - # when listening to changes in the configuration file (self.config_file) - Log.debug("Configuration file %s modified detected", self.config_file) - - def setup_config_observer(self): - Log.debug("Started Thread for observing the configuration file") - event_handler = PatternMatchingEventHandler(patterns="*", ignore_patterns="", ignore_directories=False, - case_sensitive=True) - event_handler.on_modified = self.config_observer_on_modified - observer = Observer() - observer.schedule(event_handler, self.config_file, recursive=True) - observer.start() - try: - while True: - pass - except Exception as e: - Log.error("Uncaught exception in Python instance (config observer): %s", e) - observer.stop() - observer.join() - def join(self): self.queue.put(InternalQuitMessage(True), True) self.execution_thread.join() - if self.config_file_observer_thread: - self.config_file_observer_thread.join() self.close() def close(self): diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 0c9342432d8a4..dc03b0d4a4773 100755 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -97,8 +97,6 @@ def merge_arguments(args, config_file): args.extra_dependency_repository = default_config.get("extra_dependency_repository") if not args.state_storage_serviceurl and default_config.get("state_storage_serviceurl", None): args.state_storage_serviceurl = default_config.get("state_storage_serviceurl") - if not args.enable_live_update and default_config.get("enable_live_update", None): - args.enable_live_update = default_config.get("enable_live_update") def main(): # Setup signal handlers @@ -135,7 +133,6 @@ def main(): parser.add_argument('--state_storage_serviceurl', required=False, help='Managed State Storage Service Url') parser.add_argument('--cluster_name', required=True, help='The name of the cluster this instance is running on') parser.add_argument('--config_file', required=False, default="", help='Configuration file name', type=str) - parser.add_argument('--enable_live_update', required=False, default=False, help='Configuration file name', type=bool) args = parser.parse_args() merge_arguments(args, args.config_file) @@ -266,8 +263,7 @@ def main(): secrets_provider, args.cluster_name, state_storage_serviceurl, - args.config_file, - args.enable_live_update) + args.config_file) pyinstance.run() server_instance = server.serve(args.port, pyinstance)