From 4953486baf202f7359fedbe29c1f0d95232e4caf Mon Sep 17 00:00:00 2001 From: Hetang Modi <62056057+hetangmodi-crest@users.noreply.github.com> Date: Mon, 2 Dec 2024 20:16:34 +0530 Subject: [PATCH] feat: moved methods from splunktalib (#415) [ADDON-73693](https://splunk.atlassian.net/browse/ADDON-73693) Migrated all the methods of `splunktalib` which were used in `splunktaucclib` to `solnlib` --- solnlib/concurrent/concurrent_executor.py | 102 +++++++ solnlib/concurrent/process_pool.py | 75 +++++ solnlib/concurrent/thread_pool.py | 347 ++++++++++++++++++++++ solnlib/modular_input/modinput.py | 161 ++++++++++ solnlib/rest.py | 95 ++++++ solnlib/schedule/job.py | 122 ++++++++ solnlib/schedule/scheduler.py | 162 ++++++++++ solnlib/utils.py | 26 ++ 8 files changed, 1090 insertions(+) create mode 100644 solnlib/concurrent/concurrent_executor.py create mode 100644 solnlib/concurrent/process_pool.py create mode 100644 solnlib/concurrent/thread_pool.py create mode 100644 solnlib/modular_input/modinput.py create mode 100644 solnlib/rest.py create mode 100644 solnlib/schedule/job.py create mode 100644 solnlib/schedule/scheduler.py diff --git a/solnlib/concurrent/concurrent_executor.py b/solnlib/concurrent/concurrent_executor.py new file mode 100644 index 00000000..6d3c7521 --- /dev/null +++ b/solnlib/concurrent/concurrent_executor.py @@ -0,0 +1,102 @@ +# +# Copyright 2024 Splunk Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Concurrent executor provides concurrent executing function either in a +thread pool or a process pool.""" + +import solnlib.concurrent.process_pool as pp +import solnlib.concurrent.thread_pool as tp + + +class ConcurrentExecutor: + def __init__(self, config): + """ + :param config: dict like object, contains thread_min_size (int), + thread_max_size (int), daemonize_thread (bool), + process_size (int) + """ + + self._io_executor = tp.ThreadPool( + config.get("thread_min_size", 0), + config.get("thread_max_size", 0), + config.get("task_queue_size", 1024), + config.get("daemonize_thread", True), + ) + self._compute_executor = None + if config.get("process_size", 0): + self._compute_executor = pp.ProcessPool(config.get("process_size", 0)) + + def start(self): + self._io_executor.start() + + def tear_down(self): + self._io_executor.tear_down() + if self._compute_executor is not None: + self._compute_executor.tear_down() + + def run_io_func_sync(self, func, args=(), kwargs=None): + """ + :param func: callable + :param args: free params + :param kwargs: named params + :return whatever the func returns + """ + + return self._io_executor.apply(func, args, kwargs) + + def run_io_func_async(self, func, args=(), kwargs=None, callback=None): + """ + :param func: callable + :param args: free params + :param kwargs: named params + :calllback: when func is done and without exception, call the callback + :return whatever the func returns + """ + + return self._io_executor.apply_async(func, args, kwargs, callback) + + def enqueue_io_funcs(self, funcs, block=True): + """run jobs in a fire and forget way, no result will be handled over to + clients. + + :param funcs: tuple/list-like or generator like object, func shall be + callable + """ + + return self._io_executor.enqueue_funcs(funcs, block) + + def run_compute_func_sync(self, func, args=(), kwargs={}): + """ + :param func: callable + :param args: free params + :param kwargs: named params + :return whatever the func returns + """ + + assert self._compute_executor is not None + return self._compute_executor.apply(func, args, kwargs) + + def run_compute_func_async(self, func, args=(), kwargs={}, callback=None): + """ + :param func: callable + :param args: free params + :param kwargs: named params + :calllback: when func is done and without exception, call the callback + :return whatever the func returns + """ + + assert self._compute_executor is not None + return self._compute_executor.apply_async(func, args, kwargs, callback) diff --git a/solnlib/concurrent/process_pool.py b/solnlib/concurrent/process_pool.py new file mode 100644 index 00000000..723d98e0 --- /dev/null +++ b/solnlib/concurrent/process_pool.py @@ -0,0 +1,75 @@ +# +# Copyright 2024 Splunk Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A wrapper of multiprocessing.pool.""" + +import multiprocessing + +import logging + + +class ProcessPool: + """A simple wrapper of multiprocessing.pool.""" + + def __init__(self, size=0, maxtasksperchild=10000): + if size <= 0: + size = multiprocessing.cpu_count() + self.size = size + self._pool = multiprocessing.Pool( + processes=size, maxtasksperchild=maxtasksperchild + ) + self._stopped = False + + def tear_down(self): + """Tear down the pool.""" + + if self._stopped: + logging.info("ProcessPool has already stopped.") + return + self._stopped = True + + self._pool.close() + self._pool.join() + logging.info("ProcessPool stopped.") + + def apply(self, func, args=(), kwargs={}): + """ + :param func: callable + :param args: free params + :param kwargs: named params + :return whatever the func returns + """ + + if self._stopped: + logging.info("ProcessPool has already stopped.") + return None + + return self._pool.apply(func, args, kwargs) + + def apply_async(self, func, args=(), kwargs={}, callback=None): + """ + :param func: callable + :param args: free params + :param kwargs: named params + :callback: when func is done without exception, call this callack + :return whatever the func returns + """ + + if self._stopped: + logging.info("ProcessPool has already stopped.") + return None + + return self._pool.apply_async(func, args, kwargs, callback) diff --git a/solnlib/concurrent/thread_pool.py b/solnlib/concurrent/thread_pool.py new file mode 100644 index 00000000..2d0b5bcd --- /dev/null +++ b/solnlib/concurrent/thread_pool.py @@ -0,0 +1,347 @@ +# +# Copyright 2024 Splunk Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A simple thread pool implementation.""" + +import multiprocessing +import queue +import threading +import traceback +from time import time +import logging + + +class ThreadPool: + """A simple thread pool implementation.""" + + _high_watermark = 0.2 + _resize_window = 10 + + def __init__(self, min_size=1, max_size=128, task_queue_size=1024, daemon=True): + assert task_queue_size + + if not min_size or min_size <= 0: + min_size = multiprocessing.cpu_count() + + if not max_size or max_size <= 0: + max_size = multiprocessing.cpu_count() * 8 + + self._min_size = min_size + self._max_size = max_size + self._daemon = daemon + + self._work_queue = queue.Queue(task_queue_size) + self._thrs = [] + for _ in range(min_size): + thr = threading.Thread(target=self._run) + self._thrs.append(thr) + self._admin_queue = queue.Queue() + self._admin_thr = threading.Thread(target=self._do_admin) + self._last_resize_time = time() + self._last_size = min_size + self._lock = threading.Lock() + self._occupied_threads = 0 + self._count_lock = threading.Lock() + self._started = False + + def start(self): + """Start threads in the pool.""" + + with self._lock: + if self._started: + return + self._started = True + + for thr in self._thrs: + thr.daemon = self._daemon + thr.start() + + self._admin_thr.start() + logging.info("ThreadPool started.") + + def tear_down(self): + """Tear down thread pool.""" + + with self._lock: + if not self._started: + return + self._started = False + + for thr in self._thrs: + self._work_queue.put(None, block=True) + + self._admin_queue.put(None) + + if not self._daemon: + logging.info("Wait for threads to stop.") + for thr in self._thrs: + thr.join() + self._admin_thr.join() + + logging.info("ThreadPool stopped.") + + def enqueue_funcs(self, funcs, block=True): + """run jobs in a fire and forget way, no result will be handled over to + clients. + + :param funcs: tuple/list-like or generator like object, func shall be + callable + """ + + if not self._started: + logging.info("ThreadPool has already stopped.") + return + + for func in funcs: + self._work_queue.put(func, block) + + def apply_async(self, func, args=(), kwargs=None, callback=None): + """ + :param func: callable + :param args: free params + :param kwargs: named params + :callback: when func is done and without exception, call the callback + :return AsyncResult, clients can poll or wait the result through it + """ + + if not self._started: + logging.info("ThreadPool has already stopped.") + return None + + res = AsyncResult(func, args, kwargs, callback) + self._work_queue.put(res) + return res + + def apply(self, func, args=(), kwargs=None): + """ + :param func: callable + :param args: free params + :param kwargs: named params + :return whatever the func returns + """ + + if not self._started: + logging.info("ThreadPool has already stopped.") + return None + + res = self.apply_async(func, args, kwargs) + return res.get() + + def size(self): + return self._last_size + + def resize(self, new_size): + """Resize the pool size, spawn or destroy threads if necessary.""" + + if new_size <= 0: + return + + if self._lock.locked() or not self._started: + logging.info( + "Try to resize thread pool during the tear " "down process, do nothing" + ) + return + + with self._lock: + self._remove_exited_threads_with_lock() + size = self._last_size + self._last_size = new_size + if new_size > size: + for _ in range(new_size - size): + thr = threading.Thread(target=self._run) + thr.daemon = self._daemon + thr.start() + self._thrs.append(thr) + elif new_size < size: + for _ in range(size - new_size): + self._work_queue.put(None) + logging.info("Finished ThreadPool resizing. New size=%d", new_size) + + def _remove_exited_threads_with_lock(self): + """Join the exited threads last time when resize was called.""" + + joined_thrs = set() + for thr in self._thrs: + if not thr.is_alive(): + try: + if not thr.daemon: + thr.join(timeout=0.5) + joined_thrs.add(thr.ident) + except RuntimeError: + pass + + if joined_thrs: + live_thrs = [] + for thr in self._thrs: + if thr.ident not in joined_thrs: + live_thrs.append(thr) + self._thrs = live_thrs + + def _do_resize_according_to_loads(self): + if ( + self._last_resize_time + and time() - self._last_resize_time < self._resize_window + ): + return + + thr_size = self._last_size + free_thrs = thr_size - self._occupied_threads + work_size = self._work_queue.qsize() + + logging.debug( + "current_thr_size=%s, free_thrs=%s, work_size=%s", + thr_size, + free_thrs, + work_size, + ) + if work_size and work_size > free_thrs: + if thr_size < self._max_size: + thr_size = min(thr_size * 2, self._max_size) + self.resize(thr_size) + elif free_thrs > 0: + free = free_thrs * 1.0 + if free / thr_size >= self._high_watermark and free_thrs >= 2: + # 20 % thrs are idle, tear down half of the idle ones + thr_size = thr_size - int(free_thrs // 2) + if thr_size > self._min_size: + self.resize(thr_size) + self._last_resize_time = time() + + def _do_admin(self): + admin_q = self._admin_queue + resize_win = self._resize_window + while 1: + try: + wakup = admin_q.get(timeout=resize_win + 1) + except queue.Empty: + self._do_resize_according_to_loads() + continue + + if wakup is None: + break + else: + self._do_resize_according_to_loads() + logging.info( + "ThreadPool admin thread=%s stopped.", threading.current_thread().getName() + ) + + def _run(self): + """Threads callback func, run forever to handle jobs from the job + queue.""" + + work_queue = self._work_queue + count_lock = self._count_lock + while 1: + logging.debug("Going to get job") + func = work_queue.get() + if func is None: + break + + if not self._started: + break + + logging.debug("Going to exec job") + with count_lock: + self._occupied_threads += 1 + + try: + func() + except Exception: + logging.error(traceback.format_exc()) + + with count_lock: + self._occupied_threads -= 1 + + logging.debug("Done with exec job") + logging.info("Thread work_queue_size=%d", work_queue.qsize()) + + logging.debug("Worker thread %s stopped.", threading.current_thread().getName()) + + +class AsyncResult: + def __init__(self, func, args, kwargs, callback): + self._func = func + self._args = args + self._kwargs = kwargs + self._callback = callback + self._q = queue.Queue() + + def __call__(self): + try: + if self._args and self._kwargs: + res = self._func(*self._args, **self._kwargs) + elif self._args: + res = self._func(*self._args) + elif self._kwargs: + res = self._func(**self._kwargs) + else: + res = self._func() + except Exception as e: + self._q.put(e) + return + else: + self._q.put(res) + + if self._callback is not None: + self._callback() + + def get(self, timeout=None): + """Return the result when it arrives. + + If timeout is not None and the result does not arrive within + timeout seconds then multiprocessing.TimeoutError is raised. If + the remote call raised an exception then that exception will be + reraised by get(). + """ + + try: + res = self._q.get(timeout=timeout) + except queue.Empty: + raise multiprocessing.TimeoutError("Timed out") + + if isinstance(res, Exception): + raise res + return res + + def wait(self, timeout=None): + """Wait until the result is available or until timeout seconds pass.""" + + try: + res = self._q.get(timeout=timeout) + except queue.Empty: + pass + else: + self._q.put(res) + + def ready(self): + """Return whether the call has completed.""" + + return len(self._q) + + def successful(self): + """Return whether the call completed without raising an exception. + + Will raise AssertionError if the result is not ready. + """ + + if not self.ready(): + raise AssertionError("Function is not ready") + res = self._q.get() + self._q.put(res) + + if isinstance(res, Exception): + return False + return True diff --git a/solnlib/modular_input/modinput.py b/solnlib/modular_input/modinput.py new file mode 100644 index 00000000..1e6b4e4d --- /dev/null +++ b/solnlib/modular_input/modinput.py @@ -0,0 +1,161 @@ +# +# Copyright 2024 Splunk Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import subprocess +import sys +import traceback + +import solnlib.splunkenv as sp +import logging + + +def _parse_modinput_configs(root, outer_block, inner_block): + """When user splunkd spawns modinput script to do config check or run. + + + + localhost.localdomain + https://127.0.0.1:8089 + xxxyyyzzz + ckpt_dir + + + 60 + localhost.localdomain + snow + 10 + + ... + + + + When user create an stanza through data input on WebUI + + + + localhost.localdomain + https://127.0.0.1:8089 + xxxyyyzzz + ckpt_dir + + 60 + + localhost.localdomain + snow + 10 + + + """ + + confs = root.getElementsByTagName(outer_block) + if not confs: + logging.error("Invalid config, missing %s section", outer_block) + raise Exception(f"Invalid config, missing {outer_block} section") + + configs = [] + stanzas = confs[0].getElementsByTagName(inner_block) + for stanza in stanzas: + config = {} + stanza_name = stanza.getAttribute("name") + if not stanza_name: + logging.error("Invalid config, missing name") + raise Exception("Invalid config, missing name") + + config["name"] = stanza_name + params = stanza.getElementsByTagName("param") + for param in params: + name = param.getAttribute("name") + if ( + name + and param.firstChild + and param.firstChild.nodeType == param.firstChild.TEXT_NODE + ): + config[name] = param.firstChild.data + configs.append(config) + return configs + + +def parse_modinput_configs(config_str): + """ + @config_str: modinput XML configuration feed by splunkd + @return: meta_config and stanza_config + """ + + import defusedxml.minidom as xdm + + meta_configs = { + "server_host": None, + "server_uri": None, + "session_key": None, + "checkpoint_dir": None, + } + root = xdm.parseString(config_str) + doc = root.documentElement + for tag in meta_configs.keys(): + nodes = doc.getElementsByTagName(tag) + if not nodes: + logging.error("Invalid config, missing %s section", tag) + raise Exception("Invalid config, missing %s section", tag) + + if nodes[0].firstChild and nodes[0].firstChild.nodeType == nodes[0].TEXT_NODE: + meta_configs[tag] = nodes[0].firstChild.data + else: + logging.error("Invalid config, expect text ndoe") + raise Exception("Invalid config, expect text ndoe") + + if doc.nodeName == "input": + configs = _parse_modinput_configs(doc, "configuration", "stanza") + else: + configs = _parse_modinput_configs(root, "items", "item") + return meta_configs, configs + + +def get_modinput_configs_from_cli(modinput, modinput_stanza=None): + """ + @modinput: modinput name + @modinput_stanza: modinput stanza name, for multiple instance only + """ + + assert modinput + + splunkbin = sp.get_splunk_bin() + cli = [splunkbin, "cmd", "splunkd", "print-modinput-config", modinput] + if modinput_stanza: + cli.append(modinput_stanza) + + out, err = subprocess.Popen( + cli, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ).communicate() + if err: + logging.error("Failed to get modinput configs with error: %s", err) + return None, None + else: + return parse_modinput_configs(out) + + +def get_modinput_config_str_from_stdin(): + """Get modinput from stdin which is feed by splunkd.""" + + try: + return sys.stdin.read(5000) + except Exception: + logging.error(traceback.format_exc()) + raise + + +def get_modinput_configs_from_stdin(): + config_str = get_modinput_config_str_from_stdin() + return parse_modinput_configs(config_str) diff --git a/solnlib/rest.py b/solnlib/rest.py new file mode 100644 index 00000000..4ff3abe1 --- /dev/null +++ b/solnlib/rest.py @@ -0,0 +1,95 @@ +# +# Copyright 2024 Splunk Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import urllib.parse +from traceback import format_exc +from typing import Optional + +import requests + +import logging + + +def splunkd_request( + splunkd_uri, + session_key, + method="GET", + headers=None, + data=None, + timeout=300, + retry=1, + verify=False, +) -> Optional[requests.Response]: + + headers = headers if headers is not None else {} + headers["Authorization"] = f"Splunk {session_key}" + content_type = headers.get("Content-Type") + if not content_type: + content_type = headers.get("content-type") + + if not content_type: + content_type = "application/x-www-form-urlencoded" + headers["Content-Type"] = content_type + + if data is not None: + if content_type == "application/json": + data = json.dumps(data) + else: + data = urllib.parse.urlencode(data) + + msg_temp = "Failed to send rest request=%s, errcode=%s, reason=%s" + resp = None + for _ in range(retry): + try: + resp = requests.request( + method=method, + url=splunkd_uri, + data=data, + headers=headers, + timeout=timeout, + verify=verify, + ) + except Exception: + logging.error(msg_temp, splunkd_uri, "unknown", format_exc()) + else: + if resp.status_code not in (200, 201): + if not (method == "GET" and resp.status_code == 404): + logging.debug( + msg_temp, splunkd_uri, resp.status_code, code_to_msg(resp) + ) + else: + return resp + else: + return resp + + +def code_to_msg(response: requests.Response): + code_msg_tbl = { + 400: f"Request error. reason={response.text}", + 401: "Authentication failure, invalid access credentials.", + 402: "In-use license disables this feature.", + 403: "Insufficient permission.", + 404: "Requested endpoint does not exist.", + 409: f"Invalid operation for this endpoint. reason={response.text}", + 500: f"Unspecified internal server error. reason={response.text}", + 503: ( + "Feature is disabled in the configuration file. " + "reason={}".format(response.text) + ), + } + + return code_msg_tbl.get(response.status_code, response.text) diff --git a/solnlib/schedule/job.py b/solnlib/schedule/job.py new file mode 100644 index 00000000..0e2e5fc7 --- /dev/null +++ b/solnlib/schedule/job.py @@ -0,0 +1,122 @@ +# +# Copyright 2024 Splunk Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import threading +import time + + +class Job: + """Timer wraps the callback and timestamp related stuff.""" + + _ident = 0 + _lock = threading.Lock() + + def __init__(self, func, job_props, interval, when=None, job_id=None): + """ + @job_props: dict like object + @func: execution function + @interval: execution interval + @when: seconds from epoch + @job_id: a unique id for the job + """ + + self._props = job_props + self._func = func + if when is None: + self._when = time.time() + else: + self._when = when + self._interval = interval + + if job_id is not None: + self._id = job_id + else: + with Job._lock: + self._id = Job._ident + 1 + Job._ident = Job._ident + 1 + self._stopped = False + + def ident(self): + return self._id + + def get_interval(self): + return self._interval + + def set_interval(self, interval): + self._interval = interval + + def get_expiration(self): + return self._when + + def set_initial_due_time(self, when): + if self._when is None: + self._when = when + + def update_expiration(self): + self._when += self._interval + + def get(self, key, default): + return self._props.get(key, default) + + def get_props(self): + return self._props + + def set_props(self, props): + self._props = props + + def __cmp__(self, other): + if other is None: + return 1 + + self_k = (self.get_expiration(), self.ident()) + other_k = (other.get_expiration(), other.ident()) + + if self_k == other_k: + return 0 + elif self_k < other_k: + return -1 + else: + return 1 + + def __eq__(self, other): + return isinstance(other, Job) and (self.ident() == other.ident()) + + def __lt__(self, other): + return self.__cmp__(other) == -1 + + def __gt__(self, other): + return self.__cmp__(other) == 1 + + def __ne__(self, other): + return not self.__eq__(other) + + def __le__(self, other): + return self.__lt__(other) or self.__eq__(other) + + def __ge__(self, other): + return self.__gt__(other) or self.__eq__(other) + + def __hash__(self): + return self.ident() + + def __call__(self): + self._func(self) + + def stop(self): + self._stopped = True + + def stopped(self): + return self._stopped diff --git a/solnlib/schedule/scheduler.py b/solnlib/schedule/scheduler.py new file mode 100644 index 00000000..fb1de53f --- /dev/null +++ b/solnlib/schedule/scheduler.py @@ -0,0 +1,162 @@ +# +# Copyright 2024 Splunk Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import queue +import random +import threading +from time import time + +import logging + + +class Scheduler: + """A simple scheduler which schedules the periodic or once event.""" + + import sortedcontainers as sc + + max_delay_time = 60 + + def __init__(self): + self._jobs = Scheduler.sc.SortedSet() + self._wakeup_q = queue.Queue() + self._lock = threading.Lock() + self._thr = threading.Thread(target=self._do_jobs) + # FIXME: the `daemon` property HAS to be passed in init() call ONLY, + # the below attribute setting is of incorrect spelling + self._thr.deamon = True + self._started = False + + def start(self): + """Start the schduler which will start the internal thread for + scheduling jobs. + + Please do tear_down when doing cleanup + """ + + if self._started: + logging.info("Scheduler already started.") + return + self._started = True + + self._thr.start() + + def tear_down(self): + """Stop the schduler which will stop the internal thread for scheduling + jobs.""" + + if not self._started: + logging.info("Scheduler already tear down.") + return + + self._wakeup_q.put(True) + + def _do_jobs(self): + while 1: + (sleep_time, jobs) = self.get_ready_jobs() + self._do_execution(jobs) + try: + done = self._wakeup_q.get(timeout=sleep_time) + except queue.Empty: + pass + else: + if done: + break + self._started = False + logging.info("Scheduler exited.") + + def get_ready_jobs(self): + """ + @return: a 2 element tuple. The first element is the next ready + duration. The second element is ready jobs list + """ + + now = time() + ready_jobs = [] + sleep_time = 1 + + with self._lock: + job_set = self._jobs + total_jobs = len(job_set) + for job in job_set: + if job.get_expiration() <= now: + ready_jobs.append(job) + + if ready_jobs: + del job_set[: len(ready_jobs)] + + for job in ready_jobs: + if job.get_interval() != 0 and not job.stopped(): + # repeated job, calculate next due time and enqueue + job.update_expiration() + job_set.add(job) + + if job_set: + sleep_time = job_set[0].get_expiration() - now + if sleep_time < 0: + logging.warn("Scheduler satuation, sleep_time=%s", sleep_time) + sleep_time = 0.1 + + if ready_jobs: + logging.info( + "Get %d ready jobs, next duration is %f, " + "and there are %s jobs scheduling", + len(ready_jobs), + sleep_time, + total_jobs, + ) + + ready_jobs.sort(key=lambda job: job.get("priority", 0), reverse=True) + return (sleep_time, ready_jobs) + + def add_jobs(self, jobs): + with self._lock: + now = time() + job_set = self._jobs + for job in jobs: + delay_time = random.randrange(0, self.max_delay_time) + job.set_initial_due_time(now + delay_time) + job_set.add(job) + self._wakeup() + + def update_jobs(self, jobs): + with self._lock: + job_set = self._jobs + for njob in jobs: + job_set.discard(njob) + job_set.add(njob) + self._wakeup() + + def remove_jobs(self, jobs): + with self._lock: + job_set = self._jobs + for njob in jobs: + njob.stop() + job_set.discard(njob) + self._wakeup() + + def number_of_jobs(self): + with self._lock: + return len(self._jobs) + + def disable_randomization(self): + self.max_delay_time = 1 + + def _wakeup(self): + self._wakeup_q.put(None) + + def _do_execution(self, jobs): + for job in jobs: + job() diff --git a/solnlib/utils.py b/solnlib/utils.py index 6aaa8777..4ff4fdc0 100644 --- a/solnlib/utils.py +++ b/solnlib/utils.py @@ -193,3 +193,29 @@ def extract_http_scheme_host_port(http_url: str) -> Tuple: if not http_info.scheme or not http_info.hostname or not http_info.port: raise ValueError(http_url + " is not in http(s)://hostname:port format") return http_info.scheme, http_info.hostname, http_info.port + + +def get_appname_from_path(absolute_path): + """Gets name of the app from its path. + + Arguments: + absolute_path: path of app + + Returns: + """ + absolute_path = os.path.normpath(absolute_path) + parts = absolute_path.split(os.path.sep) + parts.reverse() + for key in ("apps", "peer-apps", "manager-apps"): + try: + idx = parts.index(key) + except ValueError: + continue + else: + try: + if parts[idx + 1] == "etc": + return parts[idx - 1] + except IndexError: + pass + continue + return "-"