Skip to content

Commit

Permalink
revert the live_update mechanism
Browse files Browse the repository at this point in the history
Signed-off-by: laminar <[email protected]>
  • Loading branch information
tpiperatgod committed Dec 16, 2022
1 parent 83e1330 commit e92efe8
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 41 deletions.
2 changes: 1 addition & 1 deletion docker/pulsar/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ RUN apt-get -y --purge autoremove \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

RUN pip3 install pyyaml==5.4.1 watchdog==2.1.9
RUN pip3 install pyyaml==5.4.1

# Pulsar currently writes to the below directories, assuming the default configuration.
# Note that number 4 is the reason that pulsar components need write access to the /pulsar directory.
Expand Down
36 changes: 1 addition & 35 deletions pulsar-functions/instance/src/main/python/python_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@
from collections import namedtuple
from function_stats import Stats

from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler

Log = log.Log
# Equivalent of the InstanceConfig in Java
InstanceConfig = namedtuple('InstanceConfig', 'instance_id function_id function_version function_details max_buffered_tuples')
Expand Down Expand Up @@ -84,8 +81,7 @@ def __init__(self,
secrets_provider,
cluster_name,
state_storage_serviceurl,
config_file,
enable_live_update):
config_file):
self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
self.user_code = user_code
# set queue size to one since consumers already have internal queues. Just use queue to communicate message from
Expand Down Expand Up @@ -120,8 +116,6 @@ def __init__(self,
"%s/%s/%s" % (function_details.tenant, function_details.namespace, function_details.name)]
self.stats = Stats(self.metrics_labels)
self.config_file = config_file
self.enable_live_update = enable_live_update
self.config_file_observer_thread = None

def health_check(self):
self.last_health_check_ts = time.time()
Expand Down Expand Up @@ -224,11 +218,6 @@ def run(self):
self.execution_thread = threading.Thread(target=self.actual_execution)
self.execution_thread.start()

# Launch a thread to observe the configuration file if required
if self.enable_live_update:
self.config_file_observer_thread = threading.Thread(target=self.setup_config_observer)
self.config_file_observer_thread.start()

# start proccess spawner health check timer
self.last_health_check_ts = time.time()
if self.expected_healthcheck_interval > 0:
Expand Down Expand Up @@ -454,32 +443,9 @@ def get_function_status(self):
status.lastInvocationTime = int(last_invocation) if sys.version_info.major >= 3 else long(last_invocation)
return status

def config_observer_on_modified(self, event):
# TODO: This function is used for runtime parameters handling logic
# when listening to changes in the configuration file (self.config_file)
Log.debug("Configuration file %s modified detected", self.config_file)

def setup_config_observer(self):
Log.debug("Started Thread for observing the configuration file")
event_handler = PatternMatchingEventHandler(patterns="*", ignore_patterns="", ignore_directories=False,
case_sensitive=True)
event_handler.on_modified = self.config_observer_on_modified
observer = Observer()
observer.schedule(event_handler, self.config_file, recursive=True)
observer.start()
try:
while True:
pass
except Exception as e:
Log.error("Uncaught exception in Python instance (config observer): %s", e)
observer.stop()
observer.join()

def join(self):
self.queue.put(InternalQuitMessage(True), True)
self.execution_thread.join()
if self.config_file_observer_thread:
self.config_file_observer_thread.join()
self.close()

def close(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ def merge_arguments(args, config_file):
args.extra_dependency_repository = default_config.get("extra_dependency_repository")
if not args.state_storage_serviceurl and default_config.get("state_storage_serviceurl", None):
args.state_storage_serviceurl = default_config.get("state_storage_serviceurl")
if not args.enable_live_update and default_config.get("enable_live_update", None):
args.enable_live_update = default_config.get("enable_live_update")

def main():
# Setup signal handlers
Expand Down Expand Up @@ -135,7 +133,6 @@ def main():
parser.add_argument('--state_storage_serviceurl', required=False, help='Managed State Storage Service Url')
parser.add_argument('--cluster_name', required=True, help='The name of the cluster this instance is running on')
parser.add_argument('--config_file', required=False, default="", help='Configuration file name', type=str)
parser.add_argument('--enable_live_update', required=False, default=False, help='Configuration file name', type=bool)

args = parser.parse_args()
merge_arguments(args, args.config_file)
Expand Down Expand Up @@ -266,8 +263,7 @@ def main():
secrets_provider,
args.cluster_name,
state_storage_serviceurl,
args.config_file,
args.enable_live_update)
args.config_file)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)

Expand Down

0 comments on commit e92efe8

Please sign in to comment.