Skip to content

Commit

Permalink
optimize use of context (#585)
Browse files Browse the repository at this point in the history
* add context for OBConnector

* black format

* recover call stdio

* use context.stdio for OBConnector

* balck format

* remove stdio parameter for update_obcluster_nodes
  • Loading branch information
xiaodong-ji authored Nov 28, 2024
1 parent 63b3879 commit a513212
Show file tree
Hide file tree
Showing 21 changed files with 44 additions and 37 deletions.
10 changes: 5 additions & 5 deletions common/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def get_observer_version(context):
stdio.verbose("get observer version, by sql")
obcluster = context.cluster_config
# by sql
observer_version = get_observer_version_by_sql(obcluster, stdio)
observer_version = get_observer_version_by_sql(context, obcluster)
except Exception as e:
try:
stdio.verbose("get observer version, by sql fail. by ssh")
Expand Down Expand Up @@ -325,15 +325,15 @@ def get_obproxy_version(context):
# Only applicable to the community version


def get_observer_version_by_sql(ob_cluster, stdio=None):
stdio.verbose("start get_observer_version_by_sql . input: {0}:{1}".format(ob_cluster.get("db_host"), ob_cluster.get("db_port")))
def get_observer_version_by_sql(context, ob_cluster):
context.stdio.verbose("start get_observer_version_by_sql . input: {0}:{1}".format(ob_cluster.get("db_host"), ob_cluster.get("db_port")))
try:
ob_connector = OBConnector(ip=ob_cluster.get("db_host"), port=ob_cluster.get("db_port"), username=ob_cluster.get("tenant_sys").get("user"), password=ob_cluster.get("tenant_sys").get("password"), stdio=stdio, timeout=100)
ob_connector = OBConnector(context=context, ip=ob_cluster.get("db_host"), port=ob_cluster.get("db_port"), username=ob_cluster.get("tenant_sys").get("user"), password=ob_cluster.get("tenant_sys").get("password"), timeout=100)
ob_version_info = ob_connector.execute_sql("select version();")
except Exception as e:
raise Exception("get_observer_version_by_sql Exception. Maybe cluster'info is error: " + e.__str__())
ob_version = ob_version_info[0]
stdio.verbose("get_observer_version_by_sql ob_version_info is {0}".format(ob_version))
context.stdio.verbose("get_observer_version_by_sql ob_version_info is {0}".format(ob_version))
version = re.findall(r'OceanBase(_)?(.CE)?-v(.+)', ob_version[0])
if len(version) > 0:
return version[0][2]
Expand Down
4 changes: 2 additions & 2 deletions common/config_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(self, context):

def get_cluster_name(self):
ob_version = get_observer_version(self.context)
obConnetcor = OBConnector(ip=self.db_host, port=self.db_port, username=self.sys_tenant_user, password=self.sys_tenant_password, stdio=self.stdio, timeout=100)
obConnetcor = OBConnector(context=self.context, ip=self.db_host, port=self.db_port, username=self.sys_tenant_user, password=self.sys_tenant_password, timeout=100)
if ob_version.startswith("3") or ob_version.startswith("2"):
sql = "select cluster_name from oceanbase.v$ob_cluster"
res = obConnetcor.execute_sql(sql)
Expand All @@ -68,7 +68,7 @@ def get_cluster_name(self):

def get_host_info_list_by_cluster(self):
ob_version = get_observer_version(self.context)
obConnetcor = OBConnector(ip=self.db_host, port=self.db_port, username=self.sys_tenant_user, password=self.sys_tenant_password, stdio=self.stdio, timeout=100)
obConnetcor = OBConnector(context=self.context, ip=self.db_host, port=self.db_port, username=self.sys_tenant_user, password=self.sys_tenant_password, timeout=100)
sql = "select SVR_IP, SVR_PORT, ZONE, BUILD_VERSION from oceanbase.DBA_OB_SERVERS"
if ob_version.startswith("3") or ob_version.startswith("2") or ob_version.startswith("1"):
sql = "select SVR_IP, SVR_PORT, ZONE, BUILD_VERSION from oceanbase.__all_server"
Expand Down
5 changes: 3 additions & 2 deletions common/ob_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,22 @@ class OBConnector(object):

def __init__(
self,
context,
ip,
port,
username,
password=None,
database=None,
stdio=None,
timeout=30,
):
self.context = context
self.ip = str(ip)
self.port = int(port)
self.username = str(username)
self.password = str(password)
self.timeout = timeout
self.conn = None
self.stdio = stdio
self.stdio = context.stdio
self.database = database
self.init()

Expand Down
4 changes: 2 additions & 2 deletions core.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def set_context(self, handler_name, namespace, config):
stdio=self.stdio,
inner_config=self.inner_config_manager.config,
)
telemetry.set_cluster_conn(config.get_ob_cluster_config)
telemetry.set_cluster_conn(self.context, config.get_ob_cluster_config)

def set_context_skip_cluster_conn(self, handler_name, namespace, config):
self.context = HandlerContext(
Expand Down Expand Up @@ -175,7 +175,7 @@ def update_obcluster_nodes(self, config):
return

ob_version = get_observer_version_by_sql(ob_cluster, self.stdio)
obConnetcor = OBConnector(ip=ob_cluster["db_host"], port=ob_cluster["db_port"], username=ob_cluster["tenant_sys"]["user"], password=ob_cluster["tenant_sys"]["password"], stdio=self.stdio)
obConnetcor = OBConnector(context=self.context, ip=ob_cluster["db_host"], port=ob_cluster["db_port"], username=ob_cluster["tenant_sys"]["user"], password=ob_cluster["tenant_sys"]["password"])

sql = "select SVR_IP, SVR_PORT, ZONE, BUILD_VERSION from oceanbase.__all_server"
if ob_version.startswith(("1", "2", "3")):
Expand Down
2 changes: 1 addition & 1 deletion handler/analyzer/analyze_index_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def init_option(self):
ob_cluster = self.context.cluster_config
self.stdio.verbose('cluster config: {0}'.format(StringUtils.mask_passwords(ob_cluster)))
self.ob_cluster = ob_cluster
self.sys_connector = OBConnector(ip=ob_cluster.get("db_host"), port=ob_cluster.get("db_port"), username=ob_cluster.get("tenant_sys").get("user"), password=ob_cluster.get("tenant_sys").get("password"), stdio=self.stdio, timeout=100)
self.sys_connector = OBConnector(context=self.context, ip=ob_cluster.get("db_host"), port=ob_cluster.get("db_port"), username=ob_cluster.get("tenant_sys").get("user"), password=ob_cluster.get("tenant_sys").get("password"), timeout=100)
tenant_name = Util.get_option(options, 'tenant_name')
table_name = Util.get_option(options, 'table_name')
index_name = Util.get_option(options, 'index_name')
Expand Down
2 changes: 1 addition & 1 deletion handler/analyzer/analyze_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def init_option(self):
def get_version(self):
observer_version = ""
try:
observer_version = get_observer_version_by_sql(self.ob_cluster, self.stdio)
observer_version = get_observer_version_by_sql(self.context, self.ob_cluster)
except Exception as e:
self.stdio.exception("failed to get observer version:{0}".format(e))
self.stdio.verbose("get observer version: {0}".format(observer_version))
Expand Down
4 changes: 2 additions & 2 deletions handler/analyzer/analyze_parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ def __init__(self, context, analyze_type='default'):
self.observer_nodes = self.context.cluster_config.get("servers")
try:
self.obconn = OBConnector(
context=self.context,
ip=self.ob_cluster.get("db_host"),
port=self.ob_cluster.get("db_port"),
username=self.ob_cluster.get("tenant_sys").get("user"),
password=self.ob_cluster.get("tenant_sys").get("password"),
stdio=self.stdio,
timeout=10000,
database="oceanbase",
)
Expand All @@ -59,7 +59,7 @@ def __init__(self, context, analyze_type='default'):
def get_version(self):
observer_version = ""
try:
observer_version = get_observer_version_by_sql(self.ob_cluster, self.stdio)
observer_version = get_observer_version_by_sql(self.context, self.ob_cluster)
except Exception as e:
self.stdio.warn("failed to get observer version:{0}".format(e))
self.stdio.verbose("get observer version: {0}".format(observer_version))
Expand Down
4 changes: 2 additions & 2 deletions handler/analyzer/analyze_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ def __init__(self, context):
self.scope = None
try:
self.obconn = OBConnector(
context=self.context,
ip=self.ob_cluster.get("db_host"),
port=self.ob_cluster.get("db_port"),
username=self.ob_cluster.get("tenant_sys").get("user"),
password=self.ob_cluster.get("tenant_sys").get("password"),
stdio=self.stdio,
timeout=10000,
database="oceanbase",
)
Expand Down Expand Up @@ -159,7 +159,7 @@ def init_option(self):
def get_version(self):
observer_version = ""
try:
observer_version = get_observer_version_by_sql(self.ob_cluster, self.stdio)
observer_version = get_observer_version_by_sql(self.context, self.ob_cluster)
except Exception as e:
self.stdio.warn("AnalyzeQueueHandler failed to get observer version:{0}".format(e))
self.stdio.verbose("AnalyzeQueueHandler get observer version: {0}".format(observer_version))
Expand Down
4 changes: 2 additions & 2 deletions handler/analyzer/analyze_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def init_config(self):
ob_cluster = self.context.cluster_config
self.stdio.verbose('cluster config: {0}'.format(StringUtils.mask_passwords(ob_cluster)))
self.ob_cluster = ob_cluster
self.sys_connector = OBConnector(ip=ob_cluster.get("db_host"), port=ob_cluster.get("db_port"), username=ob_cluster.get("tenant_sys").get("user"), password=ob_cluster.get("tenant_sys").get("password"), stdio=self.stdio, timeout=100)
self.sys_connector = OBConnector(context=self.context, ip=ob_cluster.get("db_host"), port=ob_cluster.get("db_port"), username=ob_cluster.get("tenant_sys").get("user"), password=ob_cluster.get("tenant_sys").get("password"), timeout=100)
self.ob_cluster_name = ob_cluster.get("ob_cluster_name")
self.stdio.print('init cluster config complete')
return True
Expand All @@ -134,7 +134,7 @@ def init_ob_version(self):
def init_db_connector(self):
if self.db_user:
self.db_connector_provided = True
self.db_connector = OBConnector(ip=self.ob_cluster.get("db_host"), port=self.ob_cluster.get("db_port"), username=self.db_user, password=self.db_password, stdio=self.stdio, timeout=100)
self.db_connector = OBConnector(context=self.context, ip=self.ob_cluster.get("db_host"), port=self.ob_cluster.get("db_port"), username=self.db_user, password=self.db_password, timeout=100)
else:
self.db_connector = self.sys_connector

Expand Down
4 changes: 2 additions & 2 deletions handler/analyzer/analyze_sql_review.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def init_config(self):
ob_cluster = self.context.cluster_config
self.stdio.verbose('cluster config: {0}'.format(StringUtils.mask_passwords(ob_cluster)))
self.ob_cluster = ob_cluster
self.sys_connector = OBConnector(ip=ob_cluster.get("db_host"), port=ob_cluster.get("db_port"), username=ob_cluster.get("tenant_sys").get("user"), password=ob_cluster.get("tenant_sys").get("password"), stdio=self.stdio, timeout=100)
self.sys_connector = OBConnector(context=self.context, ip=ob_cluster.get("db_host"), port=ob_cluster.get("db_port"), username=ob_cluster.get("tenant_sys").get("user"), password=ob_cluster.get("tenant_sys").get("password"), timeout=100)
self.ob_cluster_name = ob_cluster.get("ob_cluster_name")
self.stdio.print('init cluster config complete')
return True
Expand All @@ -67,7 +67,7 @@ def init_db_connector(self):
if self.db_user:
self.stdio.verbose("init db connector start")
self.db_connector_provided = True
self.db_connector = OBConnector(ip=self.ob_cluster.get("db_host"), port=self.ob_cluster.get("db_port"), username=self.db_user, password=self.db_password, stdio=self.stdio, timeout=100)
self.db_connector = OBConnector(context=self.context, ip=self.ob_cluster.get("db_host"), port=self.ob_cluster.get("db_port"), username=self.db_user, password=self.db_password, timeout=100)
self.stdio.verbose("init db connector complete")
else:
self.db_connector = self.sys_connector
Expand Down
2 changes: 1 addition & 1 deletion handler/analyzer/analyze_variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ def __init__(self, context, analyze_type='diff'):
self.observer_nodes = self.context.cluster_config.get("servers")
try:
self.obconn = OBConnector(
context=self.context,
ip=self.ob_cluster.get("db_host"),
port=self.ob_cluster.get("db_port"),
username=self.ob_cluster.get("tenant_sys").get("user"),
password=self.ob_cluster.get("tenant_sys").get("password"),
stdio=self.stdio,
timeout=10000,
database="oceanbase",
)
Expand Down
2 changes: 1 addition & 1 deletion handler/checker/check_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def __init__(self, context, max_size, cluster):
self.stdio.verbose("obConnectorPool init success!")
try:
for i in range(max_size):
conn = OBConnector(ip=self.cluster.get("db_host"), port=self.cluster.get("db_port"), username=self.cluster.get("tenant_sys").get("user"), password=self.cluster.get("tenant_sys").get("password"), stdio=self.stdio, timeout=10000)
conn = OBConnector(context=context, ip=self.cluster.get("db_host"), port=self.cluster.get("db_port"), username=self.cluster.get("tenant_sys").get("user"), password=self.cluster.get("tenant_sys").get("password"), timeout=10000)
self.connections.put(conn)
self.stdio.verbose("obConnectorPool init success!")
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions handler/display/display_scenes.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self, context, display_pack_dir='./', tasks_base_path="~/.obdiag/di

def init_config(self):
self.cluster = self.context.cluster_config
self.sys_connector = OBConnector(ip=self.cluster.get("db_host"), port=self.cluster.get("db_port"), username=self.cluster.get("tenant_sys").get("user"), password=self.cluster.get("tenant_sys").get("password"), stdio=self.stdio, timeout=100)
self.sys_connector = OBConnector(context=self.context, ip=self.cluster.get("db_host"), port=self.cluster.get("db_port"), username=self.cluster.get("tenant_sys").get("user"), password=self.cluster.get("tenant_sys").get("password"), timeout=100)
self.obproxy_nodes = self.context.obproxy_config['servers']
self.ob_nodes = self.context.cluster_config['servers']
new_nodes = Util.get_nodes_list(self.context, self.ob_nodes, self.stdio)
Expand Down Expand Up @@ -89,7 +89,7 @@ def execute(self):
self.stdio.error("Internal error :{0}".format(e))

def __init_db_connector(self):
self.db_connector = OBConnector(ip=self.db_conn.get("host"), port=self.db_conn.get("port"), username=self.db_conn.get("user"), password=self.db_conn.get("password"), database=self.db_conn.get("database"), stdio=self.stdio, timeout=100)
self.db_connector = OBConnector(context=self.context, ip=self.db_conn.get("host"), port=self.db_conn.get("port"), username=self.db_conn.get("user"), password=self.db_conn.get("password"), database=self.db_conn.get("database"), timeout=100)

def __init_db_conn(self, cli_connection_string):
try:
Expand Down
8 changes: 7 additions & 1 deletion handler/gather/gather_ash_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ def __init__(self, context, gather_pack_dir='./'):
self.observer_nodes = self.context.cluster_config.get("servers")
try:
self.obconn = OBConnector(
ip=self.cluster.get("db_host"), port=self.cluster.get("db_port"), username=self.cluster.get("tenant_sys").get("user"), password=self.cluster.get("tenant_sys").get("password"), stdio=self.stdio, timeout=10000, database="oceanbase"
context=self.context,
ip=self.cluster.get("db_host"),
port=self.cluster.get("db_port"),
username=self.cluster.get("tenant_sys").get("user"),
password=self.cluster.get("tenant_sys").get("password"),
timeout=10000,
database="oceanbase",
)
except Exception as e:
self.stdio.error("Failed to connect to database: {0}".format(e))
Expand Down
4 changes: 2 additions & 2 deletions handler/gather/gather_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ def __init__(self, context, gather_pack_dir='./'):
self.observer_nodes = self.context.cluster_config.get("servers")
try:
self.obconn = OBConnector(
context=self.context,
ip=self.ob_cluster.get("db_host"),
port=self.ob_cluster.get("db_port"),
username=self.ob_cluster.get("tenant_sys").get("user"),
password=self.ob_cluster.get("tenant_sys").get("password"),
stdio=self.stdio,
timeout=10000,
database="oceanbase",
)
Expand Down Expand Up @@ -77,7 +77,7 @@ def init_option(self):
def get_version(self):
observer_version = ""
try:
observer_version = get_observer_version_by_sql(self.ob_cluster, self.stdio)
observer_version = get_observer_version_by_sql(self.context, self.ob_cluster)
except Exception as e:
self.stdio.warn("failed to get observer version:{0}".format(e))
self.stdio.verbose("get observer version: {0}".format(observer_version))
Expand Down
4 changes: 2 additions & 2 deletions handler/gather/gather_plan_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, context, gather_pack_dir='./', is_scene=False):
def init_config(self):
ob_cluster = self.context.cluster_config
self.ob_cluster = ob_cluster
self.sys_connector = OBConnector(ip=ob_cluster.get("db_host"), port=ob_cluster.get("db_port"), username=ob_cluster.get("tenant_sys").get("user"), password=ob_cluster.get("tenant_sys").get("password"), stdio=self.stdio, timeout=100)
self.sys_connector = OBConnector(context=self.context, ip=ob_cluster.get("db_host"), port=ob_cluster.get("db_port"), username=ob_cluster.get("tenant_sys").get("user"), password=ob_cluster.get("tenant_sys").get("password"), timeout=100)
self.ob_cluster_name = ob_cluster.get("ob_cluster_name")
return True

Expand Down Expand Up @@ -92,7 +92,7 @@ def init_option(self):
return self.tenant_mode_detected()

def __init_db_connector(self):
self.db_connector = OBConnector(ip=self.db_conn.get("host"), port=self.db_conn.get("port"), username=self.db_conn.get("user"), password=self.db_conn.get("password"), database=self.db_conn.get("database"), stdio=self.stdio, timeout=100)
self.db_connector = OBConnector(context=self.context, ip=self.db_conn.get("host"), port=self.db_conn.get("port"), username=self.db_conn.get("user"), password=self.db_conn.get("password"), database=self.db_conn.get("database"), timeout=100)

def handle(self):
if not self.init_config():
Expand Down
4 changes: 2 additions & 2 deletions handler/gather/gather_tabledump.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ def init(self):
else:
self.tenant_name = self.__extract_string(user)
self.ob_connector = OBConnector(
ip=self.ob_cluster.get("db_host"), port=self.ob_cluster.get("db_port"), username=self.ob_cluster.get("tenant_sys").get("user"), password=self.ob_cluster.get("tenant_sys").get("password"), stdio=self.stdio, timeout=100
context=self.context, ip=self.ob_cluster.get("db_host"), port=self.ob_cluster.get("db_port"), username=self.ob_cluster.get("tenant_sys").get("user"), password=self.ob_cluster.get("tenant_sys").get("password"), timeout=100
)
self.tenant_connector = OBConnector(ip=self.ob_cluster.get("db_host"), port=self.ob_cluster.get("db_port"), username=user, password=password, stdio=self.stdio, timeout=100)
self.tenant_connector = OBConnector(context=self.context, ip=self.ob_cluster.get("db_host"), port=self.ob_cluster.get("db_port"), username=user, password=password, timeout=100)
self.file_name = "{0}/obdiag_tabledump_result_{1}.txt".format(self.store_dir, TimeUtils.timestamp_to_filename_time(self.gather_timestamp))
return True
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion handler/gather/gather_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ def __init__(self, context, gather_pack_dir='./'):
self.observer_nodes = self.context.cluster_config.get("servers")
try:
self.obconn = OBConnector(
context=self.context,
ip=self.ob_cluster.get("db_host"),
port=self.ob_cluster.get("db_port"),
username=self.ob_cluster.get("tenant_sys").get("user"),
password=self.ob_cluster.get("tenant_sys").get("password"),
stdio=self.stdio,
timeout=10000,
database="oceanbase",
)
Expand Down
Loading

0 comments on commit a513212

Please sign in to comment.