From e92efe81273de8f8187786b9529d49471246ba51 Mon Sep 17 00:00:00 2001 From: laminar Date: Mon, 12 Dec 2022 16:06:39 +0800 Subject: [PATCH] revert the live_update mechanism Signed-off-by: laminar --- docker/pulsar/Dockerfile | 2 +- .../src/main/python/python_instance.py | 36 +------------------ .../src/main/python/python_instance_main.py | 6 +--- 3 files changed, 3 insertions(+), 41 deletions(-) diff --git a/docker/pulsar/Dockerfile b/docker/pulsar/Dockerfile index b2ed47de360cd..2194fc03cb233 100644 --- a/docker/pulsar/Dockerfile +++ b/docker/pulsar/Dockerfile @@ -79,7 +79,7 @@ RUN apt-get -y --purge autoremove \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* -RUN pip3 install pyyaml==5.4.1 watchdog==2.1.9 +RUN pip3 install pyyaml==5.4.1 # Pulsar currently writes to the below directories, assuming the default configuration. # Note that number 4 is the reason that pulsar components need write access to the /pulsar directory. diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 78302ebebd813..cf53e75d9d0cf 100755 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -46,9 +46,6 @@ from collections import namedtuple from function_stats import Stats -from watchdog.observers import Observer -from watchdog.events import PatternMatchingEventHandler - Log = log.Log # Equivalent of the InstanceConfig in Java InstanceConfig = namedtuple('InstanceConfig', 'instance_id function_id function_version function_details max_buffered_tuples') @@ -84,8 +81,7 @@ def __init__(self, secrets_provider, cluster_name, state_storage_serviceurl, - config_file, - enable_live_update): + config_file): self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) self.user_code = user_code # set queue size to one since consumers already have internal queues. Just use queue to communicate message from @@ -120,8 +116,6 @@ def __init__(self, "%s/%s/%s" % (function_details.tenant, function_details.namespace, function_details.name)] self.stats = Stats(self.metrics_labels) self.config_file = config_file - self.enable_live_update = enable_live_update - self.config_file_observer_thread = None def health_check(self): self.last_health_check_ts = time.time() @@ -224,11 +218,6 @@ def run(self): self.execution_thread = threading.Thread(target=self.actual_execution) self.execution_thread.start() - # Launch a thread to observe the configuration file if required - if self.enable_live_update: - self.config_file_observer_thread = threading.Thread(target=self.setup_config_observer) - self.config_file_observer_thread.start() - # start proccess spawner health check timer self.last_health_check_ts = time.time() if self.expected_healthcheck_interval > 0: @@ -454,32 +443,9 @@ def get_function_status(self): status.lastInvocationTime = int(last_invocation) if sys.version_info.major >= 3 else long(last_invocation) return status - def config_observer_on_modified(self, event): - # TODO: This function is used for runtime parameters handling logic - # when listening to changes in the configuration file (self.config_file) - Log.debug("Configuration file %s modified detected", self.config_file) - - def setup_config_observer(self): - Log.debug("Started Thread for observing the configuration file") - event_handler = PatternMatchingEventHandler(patterns="*", ignore_patterns="", ignore_directories=False, - case_sensitive=True) - event_handler.on_modified = self.config_observer_on_modified - observer = Observer() - observer.schedule(event_handler, self.config_file, recursive=True) - observer.start() - try: - while True: - pass - except Exception as e: - Log.error("Uncaught exception in Python instance (config observer): %s", e) - observer.stop() - observer.join() - def join(self): self.queue.put(InternalQuitMessage(True), True) self.execution_thread.join() - if self.config_file_observer_thread: - self.config_file_observer_thread.join() self.close() def close(self): diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 0c9342432d8a4..dc03b0d4a4773 100755 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -97,8 +97,6 @@ def merge_arguments(args, config_file): args.extra_dependency_repository = default_config.get("extra_dependency_repository") if not args.state_storage_serviceurl and default_config.get("state_storage_serviceurl", None): args.state_storage_serviceurl = default_config.get("state_storage_serviceurl") - if not args.enable_live_update and default_config.get("enable_live_update", None): - args.enable_live_update = default_config.get("enable_live_update") def main(): # Setup signal handlers @@ -135,7 +133,6 @@ def main(): parser.add_argument('--state_storage_serviceurl', required=False, help='Managed State Storage Service Url') parser.add_argument('--cluster_name', required=True, help='The name of the cluster this instance is running on') parser.add_argument('--config_file', required=False, default="", help='Configuration file name', type=str) - parser.add_argument('--enable_live_update', required=False, default=False, help='Configuration file name', type=bool) args = parser.parse_args() merge_arguments(args, args.config_file) @@ -266,8 +263,7 @@ def main(): secrets_provider, args.cluster_name, state_storage_serviceurl, - args.config_file, - args.enable_live_update) + args.config_file) pyinstance.run() server_instance = server.serve(args.port, pyinstance)