Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config thread safe updated #11122

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 98 additions & 23 deletions ocs_ci/framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
from collections.abc import Mapping
from dataclasses import dataclass, field, fields
from ocs_ci.ocs.exceptions import ClusterNotFoundException
from threading import Thread, RLock, local, get_ident

THIS_DIR = os.path.dirname(os.path.abspath(__file__))
DEFAULT_CONFIG_PATH = os.path.join(THIS_DIR, "conf/default_config.yaml")

logger = logging.getLogger(__name__)

config_lock = RLock()


@dataclass
class Config:
Expand Down Expand Up @@ -135,9 +138,9 @@ class MultiClusterConfig:
# multiple cluster contexts
def __init__(self):
# Holds all cluster's Config() object
self.thread_local_data = local()
self.clusters = list()
# This member always points to current cluster's Config() object
self.cluster_ctx = None
self.nclusters = 1
# Index for current cluster in context
self.cur_index = 0
Expand All @@ -151,6 +154,18 @@ def __init__(self):
self.single_cluster_default = True
self._single_cluster_init_cluster_configs()

def __getattr__(self, attr):
with config_lock:
config_index = getattr(
self.thread_local_data, "config_index", self.cur_index
)
return getattr(self.clusters[config_index], attr)

@property
def cluster_ctx(self):
config_index = getattr(self.thread_local_data, "config_index", self.cur_index)
return self.clusters[config_index]

@property
def default_cluster_ctx(self):
"""
Expand All @@ -161,17 +176,25 @@ def default_cluster_ctx(self):
Returns:
ocs_ci.framework.Config: The default cluster context

"""
return self.clusters[self.default_cluster_index]

@property
def default_cluster_index(self):
"""
Get the default cluster index.
The default cluster index as defined in the
'ENV DATA' param 'default_cluster_context_index'

Returns:
int: The default cluster context index

"""
# Get the default index. If not found, the default value is 0
default_index = self.cluster_ctx.ENV_DATA.get(
"default_cluster_context_index", 0
)
return self.clusters[default_index]
return self.ENV_DATA.get("default_cluster_context_index", 0)

def _single_cluster_init_cluster_configs(self):
self.clusters.insert(0, Config())
self.cluster_ctx = self.clusters[0]
self.attr_init()
self._refresh_ctx()

def init_cluster_configs(self):
Expand All @@ -181,47 +204,45 @@ def init_cluster_configs(self):
for i in range(self.nclusters):
self.clusters.insert(i, Config())
self.clusters[i].MULTICLUSTER["multicluster_index"] = i
self.cluster_ctx = self.clusters[0]
self.attr_init()
# TODO: Delete _refresh_ctx after confirming we don't need it
self._refresh_ctx()
self.single_cluster_default = False

def attr_init(self):
self.attr_list = [attr for attr in self.cluster_ctx.__dataclass_fields__.keys()]

# TODO we can delete function after _refresh_ctx is not needed
def update(self, user_dict):
self.cluster_ctx.update(user_dict)
self._refresh_ctx()

# TODO we can delete function after _refresh_ctx is not needed
def reset(self):
self.cluster_ctx.reset()
self._refresh_ctx()

def get_defaults(self):
return self.cluster_ctx.get_defaults()

# TODO check if we need this function
def reset_ctx(self):
self.cluster_ctx = self.clusters[0]
self.cur_index = 0
# TODO: Delete _refresh_ctx after confirming we don't need it
self._refresh_ctx()

def _refresh_ctx(self):
[
self.__setattr__(attr, self.cluster_ctx.__getattribute__(attr))
for attr in self.attr_list
]
self.to_dict = self.cluster_ctx.to_dict
# TODO: We need to get rid of KUBECONFIG from ENV
if self.RUN.get("kubeconfig"):
os.environ["KUBECONFIG"] = self.RUN.get("kubeconfig")

def switch_ctx(self, index=0):
self.cluster_ctx = self.clusters[index]
self.cur_index = index
if hasattr(self.thread_local_data, "config_index"):
thread_id = get_ident()
# TODO: Change to debug
logger.info(f"Thread ID: {thread_id} is using config index: {index}")
config.thread_local_data.config_index = index
# TODO: We need to get rid of KUBECONFIG from ENV
self._refresh_ctx()
# Log the switch after changing the current index
logger.info(f"Switched to cluster: {self.current_cluster_name()}")

def switch_acm_ctx(self):
self.switch_ctx(self.get_active_acm_index())
self.cur_index = self.get_active_acm_index()

def get_active_acm_index(self):
"""
Expand Down Expand Up @@ -558,6 +579,60 @@ def remove_cluster_by_name(self, cluster_name):
config = MultiClusterConfig()


class ConfigSafeThread(Thread):
"""
This is the Config version of threading.Thread which will define config index
to be used by the all calls the thread is using as it updates config.thread_local_data
with specific Thread ID and config ID to be used by thread for its life cycle.
"""

def __init__(self, config_index, *args, **kwargs):
"""
Constructor for ConfigSafeThread class

Args:
config_index (int): index of config to be used by the thread
"""
with config_lock:
super(ConfigSafeThread, self).__init__(*args, **kwargs)
self.config_index = config_index

def run(self, *args, **kwargs):
config.thread_local_data.config_index = self.config_index
thread_id = get_ident()
# TODO: Change to debug
logger.info(
f"Thread ID: {thread_id} is using config index: {self.config_index}"
)
try:
super(ConfigSafeThread, self).run()
finally:
if hasattr(self.thread_local_data, "config_index"):
del config.thread_local_data.config_index


def config_safe_thread_pool_task(config_index, task, *args, **kwargs):
"""
Wrapper function to be executed in ThreadPoolExecutor

Args:
config_index (int): first positional argument defining config index to be used by Thread.
task (function): function to be called by ThreadPoolExecutor

"""
with config_lock:
thread_id = get_ident()
# TODO: Change to debug
logger.info(f"Thread ID: {thread_id} is using config index: {config_index}")
config.thread_local_data.config_index = config_index

try:
task(*args, **kwargs)
finally:
with config_lock:
del config.thread_local_data.config_index


class GlobalVariables:
# Test time report
TIMEREPORT_DICT: dict = dict()
5 changes: 3 additions & 2 deletions ocs_ci/utility/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,9 @@ def exec_cmd(
cmd = shlex.split(cmd)
if config.RUN.get("custom_kubeconfig_location") and cmd[0] == "oc":
if "--kubeconfig" in cmd:
cmd.pop(2)
cmd.pop(1)
kube_index = cmd.index("--kubeconfig")
cmd.pop(kube_index + 1)
cmd.pop(kube_index)
cmd = list_insert_at_position(cmd, 1, ["--kubeconfig"])
cmd = list_insert_at_position(
cmd, 2, [config.RUN["custom_kubeconfig_location"]]
Expand Down
3 changes: 3 additions & 0 deletions tests/functional/lvmo/test_lvm_multi_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
skipif_lvm_not_installed,
aqua_squad,
)
from ocs_ci.framework import config, config_safe_thread_pool_task
from ocs_ci.framework.testlib import skipif_ocs_version, ManageTest, acceptance
from ocs_ci.ocs import constants
from ocs_ci.ocs.cluster import LVM
Expand Down Expand Up @@ -100,6 +101,8 @@ def test_create_multi_snapshot_from_pvc(
for exec_num in range(0, self.pvc_num):
futures.append(
executor.submit(
config_safe_thread_pool_task,
config.default_cluster_index,
pvc_factory,
project=proj_obj,
interface=None,
Expand Down
16 changes: 10 additions & 6 deletions tests/functional/object/mcg/test_object_versioning.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import logging
import pytest
import os
import threading

from uuid import uuid4

from ocs_ci.framework import config
from ocs_ci.framework import config, ConfigSafeThread
from ocs_ci.framework.pytest_customization.marks import (
bugzilla,
tier2,
Expand Down Expand Up @@ -107,12 +106,17 @@ def test_versioning_parallel_ops(
command = f'psql -h 127.0.0.1 -p 5432 -U postgres -d nbcore -c "{query}"'

# perform PUT and DELETE parallely on loop
config_index = config.default_cluster_index
for i in range(0, 5):
threading.Thread(
target=s3_delete_object, args=(s3_obj, bucket_name, filename)
ConfigSafeThread(
config_index=config_index,
target=s3_delete_object,
args=(s3_obj, bucket_name, filename),
).start()
threading.Thread(
target=s3_put_object, args=(s3_obj, bucket_name, filename, filename)
ConfigSafeThread(
config_index=config_index,
target=s3_put_object,
args=(s3_obj, bucket_name, filename, filename),
).start()

# head object
Expand Down
20 changes: 17 additions & 3 deletions tests/functional/object/mcg/test_write_to_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest
from flaky import flaky

from ocs_ci.framework import config, config_safe_thread_pool_task
from ocs_ci.framework.pytest_customization.marks import (
vsphere_platform_required,
skip_inconsistent,
Expand Down Expand Up @@ -59,7 +60,13 @@ def pod_io(pods):
"""
with ThreadPoolExecutor() as p:
for pod in pods:
p.submit(pod.run_io, "fs", "1G")
p.submit(
config_safe_thread_pool_task,
config.default_cluster_index,
pod.run_io,
"fs",
"1G",
)


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -370,11 +377,18 @@ def test_write_to_bucket_rbd_cephfs(
full_object_path = f"s3://{bucketname}"
target_dir = AWSCLI_TEST_OBJ_DIR
with ThreadPoolExecutor() as p:
p.submit(pod_io, setup_rbd_cephfs_pods)
p.submit(
config_safe_thread_pool_task,
config.default_cluster_index,
pod_io,
setup_rbd_cephfs_pods,
)
p.submit(
config_safe_thread_pool_task,
config.default_cluster_index,
sync_object_directory(
awscli_pod_session, target_dir, full_object_path, mcg_obj
)
),
)

@tier2
Expand Down
Loading