Skip to content

Commit

Permalink
Reduce the number of connections to SortingHat server
Browse files Browse the repository at this point in the history
Mordred makes a lot of connections to the SortingHat server, causing
the uWSGI queue to fill up.

Signed-off-by: Quan Zhou <[email protected]>
  • Loading branch information
zhquan committed Aug 28, 2024
1 parent f2a96e0 commit 59452fa
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
title: Reduced the number of connections to SortingHat
category: performance
author: Quan Zhou
issue: null
notes: >
Mordred makes a lot of connections to the SortingHat server
which could cause the uWSGI queue to fill up. When the uWSGI
queue is full, Mordred cannot connect to the SortingHat server.
40 changes: 35 additions & 5 deletions sirmordred/sirmordred.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from sirmordred.task_manager import TasksManager
from sirmordred.task_panels import TaskPanels, TaskPanelsMenu
from sirmordred.task_projects import TaskProjects
from sortinghat.cli.client import SortingHatClient

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -198,15 +199,15 @@ def _split_tasks(tasks_cls):
repos_backend = self._get_repos_by_backend()
for backend in repos_backend:
# Start new Threads and add them to the threads list to complete
t = TasksManager(backend_tasks, backend, stopper, self.config, small_delay)
t = TasksManager(backend_tasks, backend, stopper, self.config, self.client, small_delay)
threads.append(t)
t.start()

# launch thread for global tasks
if len(global_tasks) > 0:
# FIXME timer is applied to all global_tasks, does it make sense?
# All tasks are executed in the same thread sequentially
gt = TasksManager(global_tasks, "Global tasks", stopper, self.config, big_delay)
gt = TasksManager(global_tasks, "Global tasks", stopper, self.config, self.client, big_delay)
threads.append(gt)
gt.start()
if big_delay > 0:
Expand Down Expand Up @@ -248,14 +249,14 @@ def __execute_initial_load(self):
if self.conf['phases']['panels']:
tasks = [TaskPanels, TaskPanelsMenu]
stopper.set()
tm = TasksManager(tasks, "Global tasks", stopper, self.config)
tm = TasksManager(tasks, "Global tasks", stopper, self.config, self.client)
tm.start()
tm.join()

logger.info("Loading projects")
tasks = [TaskProjects]
stopper.set()
tm = TasksManager(tasks, "Global tasks", stopper, self.config)
tm = TasksManager(tasks, "Global tasks", stopper, self.config, self.client)
tm.start()
tm.join()
logger.info("Projects loaded")
Expand All @@ -280,7 +281,7 @@ def start(self):

# check we have access to the needed ES
if not self.check_es_access():
print('Can not access Elasticsearch service. Exiting sirmordred ...')
print('Can not access ElasticSearch/OpenSearch service. Exiting sirmordred ...')
sys.exit(1)

# If bestiary is configured check that it is working
Expand All @@ -289,6 +290,9 @@ def start(self):
print('Can not access bestiary service. Exiting sirmordred ...')
sys.exit(1)

# Create SortingHat Client
self.__create_sh_client(self.config)

# Initial round: panels and projects loading
self.__execute_initial_load()

Expand Down Expand Up @@ -336,3 +340,29 @@ def start(self):
logger.error(var)

logger.info("Finished SirMordred engine ...")

def __create_sh_client(self, config):
self.config = config
self.conf = config.get_conf()

sortinghat = self.conf.get('sortinghat', None)
self.db_sh = sortinghat['database'] if sortinghat else None
self.db_user = sortinghat['user'] if sortinghat else None
self.db_password = sortinghat['password'] if sortinghat else None
self.db_host = sortinghat['host'] if sortinghat else '127.0.0.1'
self.db_path = sortinghat.get('path', None) if sortinghat else None
self.db_port = sortinghat.get('port', None) if sortinghat else None
self.db_ssl = sortinghat.get('ssl', False) if sortinghat else False
self.db_verify_ssl = sortinghat.get('verify_ssl', True) if sortinghat else True
self.db_tenant = sortinghat.get('tenant', True) if sortinghat else None
self.db_unaffiliate_group = sortinghat['unaffiliated_group'] if sortinghat else None
if sortinghat and not hasattr(self, 'client'):
self.client = SortingHatClient(host=self.db_host, port=self.db_port,
path=self.db_path, ssl=self.db_ssl,
user=self.db_user, password=self.db_password,
verify_ssl=self.db_verify_ssl,
tenant=self.db_tenant)
self.client.connect()
logger.info("SORTINGHAT")
elif not sortinghat:
self.client = None
13 changes: 2 additions & 11 deletions sirmordred/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from grimoire_elk.elk import get_ocean_backend
from grimoire_elk.utils import get_connector_from_name, get_elastic
from grimoire_elk.enriched.utils import grimoire_con
from sortinghat.cli.client import SortingHatClient

logger = logging.getLogger(__name__)

Expand All @@ -42,10 +41,11 @@ class Task():
'studies', 'node_regex', 'anonymize']
PARAMS_WITH_SPACES = ['blacklist-jobs']

def __init__(self, config):
def __init__(self, config, sortinghat_client):
self.backend_section = None
self.config = config
self.conf = config.get_conf()
self.client = sortinghat_client

sortinghat = self.conf.get('sortinghat', None)
self.db_sh = sortinghat['database'] if sortinghat else None
Expand All @@ -58,15 +58,6 @@ def __init__(self, config):
self.db_verify_ssl = sortinghat.get('verify_ssl', True) if sortinghat else True
self.db_tenant = sortinghat.get('tenant', True) if sortinghat else None
self.db_unaffiliate_group = sortinghat['unaffiliated_group'] if sortinghat else None
if sortinghat:
self.client = SortingHatClient(host=self.db_host, port=self.db_port,
path=self.db_path, ssl=self.db_ssl,
user=self.db_user, password=self.db_password,
verify_ssl=self.db_verify_ssl,
tenant=self.db_tenant)
self.client.connect()
else:
self.client = None

self.grimoire_con = grimoire_con(conn_retries=12) # 30m retry

Expand Down
4 changes: 2 additions & 2 deletions sirmordred/task_autorefresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
class TaskAutorefresh(Task):
"""Refresh the last modified identities for all the backends."""

def __init__(self, config):
super().__init__(config)
def __init__(self, config, sortinghat_client):
super().__init__(config, sortinghat_client)

self.last_autorefresh_backend = {}

Expand Down
4 changes: 2 additions & 2 deletions sirmordred/task_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
class TaskRawDataCollection(Task):
""" Basic class shared by all collection tasks """

def __init__(self, config, backend_section=None, allowed_repos=None):
super().__init__(config)
def __init__(self, config, sortinghat_client, backend_section=None, allowed_repos=None):
super().__init__(config, sortinghat_client)

self.backend_section = backend_section
self.allowed_repos = set(allowed_repos) if allowed_repos else None
Expand Down
4 changes: 2 additions & 2 deletions sirmordred/task_enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
class TaskEnrich(Task):
""" Basic class shared by all enriching tasks """

def __init__(self, config, backend_section=None, allowed_repos=None):
super().__init__(config)
def __init__(self, config, sortinghat_client, backend_section=None, allowed_repos=None):
super().__init__(config, sortinghat_client)
self.backend_section = backend_section
self.allowed_repos = set(allowed_repos) if allowed_repos else None
# This will be options in next iteration
Expand Down
5 changes: 3 additions & 2 deletions sirmordred/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TasksManager(threading.Thread):
IDENTITIES_TASKS_ON_LOCK = threading.Lock()
IDENTITIES_TASKS_ON = False

def __init__(self, tasks_cls, backend_section, stopper, config, timer=0):
def __init__(self, tasks_cls, backend_section, stopper, config, sortinghat_client, timer=0):
"""
:tasks_cls : tasks classes to be executed using the backend
:backend_section: perceval backend section name
Expand All @@ -64,6 +64,7 @@ def __init__(self, tasks_cls, backend_section, stopper, config, timer=0):
self.stopper = stopper # To stop the thread from parent
self.timer = timer
self.thread_id = None
self.client = sortinghat_client

def add_task(self, task):
self.tasks.append(task)
Expand All @@ -80,7 +81,7 @@ def __set_thread_id():
logger.debug(self.tasks_cls)
for tc in self.tasks_cls:
# create the real Task from the class
task = tc(self.config)
task = tc(self.config, self.client)
task.set_backend_section(self.backend_section)
self.tasks.append(task)

Expand Down

0 comments on commit 59452fa

Please sign in to comment.