diff --git a/releases/unreleased/reduced-the-number-of-connections-to-sortinghat.yml b/releases/unreleased/reduced-the-number-of-connections-to-sortinghat.yml new file mode 100644 index 00000000..c74df8c9 --- /dev/null +++ b/releases/unreleased/reduced-the-number-of-connections-to-sortinghat.yml @@ -0,0 +1,9 @@ +--- +title: Reduced the number of connections to SortingHat +category: performance +author: Quan Zhou +issue: null +notes: > + Mordred makes a lot of connections to the SortingHat server + which could cause the uWSGI queue to fill up. When the uWSGI + queue is full, Mordred cannot connect to the SortingHat server. diff --git a/sirmordred/sirmordred.py b/sirmordred/sirmordred.py index 32036f7f..8d0f768b 100755 --- a/sirmordred/sirmordred.py +++ b/sirmordred/sirmordred.py @@ -50,6 +50,7 @@ from sirmordred.task_manager import TasksManager from sirmordred.task_panels import TaskPanels, TaskPanelsMenu from sirmordred.task_projects import TaskProjects +from sortinghat.cli.client import SortingHatClient logger = logging.getLogger(__name__) @@ -198,7 +199,7 @@ def _split_tasks(tasks_cls): repos_backend = self._get_repos_by_backend() for backend in repos_backend: # Start new Threads and add them to the threads list to complete - t = TasksManager(backend_tasks, backend, stopper, self.config, small_delay) + t = TasksManager(backend_tasks, backend, stopper, self.config, self.client, small_delay) threads.append(t) t.start() @@ -206,7 +207,7 @@ def _split_tasks(tasks_cls): if len(global_tasks) > 0: # FIXME timer is applied to all global_tasks, does it make sense? # All tasks are executed in the same thread sequentially - gt = TasksManager(global_tasks, "Global tasks", stopper, self.config, big_delay) + gt = TasksManager(global_tasks, "Global tasks", stopper, self.config, self.client, big_delay) threads.append(gt) gt.start() if big_delay > 0: @@ -248,14 +249,14 @@ def __execute_initial_load(self): if self.conf['phases']['panels']: tasks = [TaskPanels, TaskPanelsMenu] stopper.set() - tm = TasksManager(tasks, "Global tasks", stopper, self.config) + tm = TasksManager(tasks, "Global tasks", stopper, self.config, self.client) tm.start() tm.join() logger.info("Loading projects") tasks = [TaskProjects] stopper.set() - tm = TasksManager(tasks, "Global tasks", stopper, self.config) + tm = TasksManager(tasks, "Global tasks", stopper, self.config, self.client) tm.start() tm.join() logger.info("Projects loaded") @@ -280,7 +281,7 @@ def start(self): # check we have access to the needed ES if not self.check_es_access(): - print('Can not access Elasticsearch service. Exiting sirmordred ...') + print('Can not access ElasticSearch/OpenSearch service. Exiting sirmordred ...') sys.exit(1) # If bestiary is configured check that it is working @@ -289,6 +290,9 @@ def start(self): print('Can not access bestiary service. Exiting sirmordred ...') sys.exit(1) + # Create SortingHat Client + self.__create_sh_client(self.config) + # Initial round: panels and projects loading self.__execute_initial_load() @@ -336,3 +340,29 @@ def start(self): logger.error(var) logger.info("Finished SirMordred engine ...") + + def __create_sh_client(self, config): + self.config = config + self.conf = config.get_conf() + + sortinghat = self.conf.get('sortinghat', None) + self.db_sh = sortinghat['database'] if sortinghat else None + self.db_user = sortinghat['user'] if sortinghat else None + self.db_password = sortinghat['password'] if sortinghat else None + self.db_host = sortinghat['host'] if sortinghat else '127.0.0.1' + self.db_path = sortinghat.get('path', None) if sortinghat else None + self.db_port = sortinghat.get('port', None) if sortinghat else None + self.db_ssl = sortinghat.get('ssl', False) if sortinghat else False + self.db_verify_ssl = sortinghat.get('verify_ssl', True) if sortinghat else True + self.db_tenant = sortinghat.get('tenant', True) if sortinghat else None + self.db_unaffiliate_group = sortinghat['unaffiliated_group'] if sortinghat else None + if sortinghat and not hasattr(self, 'client'): + self.client = SortingHatClient(host=self.db_host, port=self.db_port, + path=self.db_path, ssl=self.db_ssl, + user=self.db_user, password=self.db_password, + verify_ssl=self.db_verify_ssl, + tenant=self.db_tenant) + self.client.connect() + logger.info("SORTINGHAT") + elif not sortinghat: + self.client = None diff --git a/sirmordred/task.py b/sirmordred/task.py index 652dbc58..2ba994fc 100644 --- a/sirmordred/task.py +++ b/sirmordred/task.py @@ -29,7 +29,6 @@ from grimoire_elk.elk import get_ocean_backend from grimoire_elk.utils import get_connector_from_name, get_elastic from grimoire_elk.enriched.utils import grimoire_con -from sortinghat.cli.client import SortingHatClient logger = logging.getLogger(__name__) @@ -42,10 +41,11 @@ class Task(): 'studies', 'node_regex', 'anonymize'] PARAMS_WITH_SPACES = ['blacklist-jobs'] - def __init__(self, config): + def __init__(self, config, sortinghat_client): self.backend_section = None self.config = config self.conf = config.get_conf() + self.client = sortinghat_client sortinghat = self.conf.get('sortinghat', None) self.db_sh = sortinghat['database'] if sortinghat else None @@ -58,15 +58,6 @@ def __init__(self, config): self.db_verify_ssl = sortinghat.get('verify_ssl', True) if sortinghat else True self.db_tenant = sortinghat.get('tenant', True) if sortinghat else None self.db_unaffiliate_group = sortinghat['unaffiliated_group'] if sortinghat else None - if sortinghat: - self.client = SortingHatClient(host=self.db_host, port=self.db_port, - path=self.db_path, ssl=self.db_ssl, - user=self.db_user, password=self.db_password, - verify_ssl=self.db_verify_ssl, - tenant=self.db_tenant) - self.client.connect() - else: - self.client = None self.grimoire_con = grimoire_con(conn_retries=12) # 30m retry diff --git a/sirmordred/task_autorefresh.py b/sirmordred/task_autorefresh.py index 3b81d434..7a895935 100644 --- a/sirmordred/task_autorefresh.py +++ b/sirmordred/task_autorefresh.py @@ -43,8 +43,8 @@ class TaskAutorefresh(Task): """Refresh the last modified identities for all the backends.""" - def __init__(self, config): - super().__init__(config) + def __init__(self, config, sortinghat_client): + super().__init__(config, sortinghat_client) self.last_autorefresh_backend = {} diff --git a/sirmordred/task_collection.py b/sirmordred/task_collection.py index 47eb875f..e361af5b 100644 --- a/sirmordred/task_collection.py +++ b/sirmordred/task_collection.py @@ -41,8 +41,8 @@ class TaskRawDataCollection(Task): """ Basic class shared by all collection tasks """ - def __init__(self, config, backend_section=None, allowed_repos=None): - super().__init__(config) + def __init__(self, config, sortinghat_client, backend_section=None, allowed_repos=None): + super().__init__(config, sortinghat_client) self.backend_section = backend_section self.allowed_repos = set(allowed_repos) if allowed_repos else None diff --git a/sirmordred/task_enrich.py b/sirmordred/task_enrich.py index 4d27e755..707e75bd 100644 --- a/sirmordred/task_enrich.py +++ b/sirmordred/task_enrich.py @@ -53,8 +53,8 @@ class TaskEnrich(Task): """ Basic class shared by all enriching tasks """ - def __init__(self, config, backend_section=None, allowed_repos=None): - super().__init__(config) + def __init__(self, config, sortinghat_client, backend_section=None, allowed_repos=None): + super().__init__(config, sortinghat_client) self.backend_section = backend_section self.allowed_repos = set(allowed_repos) if allowed_repos else None # This will be options in next iteration diff --git a/sirmordred/task_manager.py b/sirmordred/task_manager.py index 058a890e..13626597 100644 --- a/sirmordred/task_manager.py +++ b/sirmordred/task_manager.py @@ -50,7 +50,7 @@ class TasksManager(threading.Thread): IDENTITIES_TASKS_ON_LOCK = threading.Lock() IDENTITIES_TASKS_ON = False - def __init__(self, tasks_cls, backend_section, stopper, config, timer=0): + def __init__(self, tasks_cls, backend_section, stopper, config, sortinghat_client, timer=0): """ :tasks_cls : tasks classes to be executed using the backend :backend_section: perceval backend section name @@ -64,6 +64,7 @@ def __init__(self, tasks_cls, backend_section, stopper, config, timer=0): self.stopper = stopper # To stop the thread from parent self.timer = timer self.thread_id = None + self.client = sortinghat_client def add_task(self, task): self.tasks.append(task) @@ -80,7 +81,7 @@ def __set_thread_id(): logger.debug(self.tasks_cls) for tc in self.tasks_cls: # create the real Task from the class - task = tc(self.config) + task = tc(self.config, self.client) task.set_backend_section(self.backend_section) self.tasks.append(task)