From 6b35a3b152384d5db3e34bd0dc8711b6d3461839 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B8=A0=E7=A3=8A?= Date: Wed, 19 Jun 2024 11:35:20 +0800 Subject: [PATCH] fix rca ddl_disk_full_scene --- handler/rca/scene/ddl_disk_full_scene.py | 148 +++++++++++++---------- 1 file changed, 83 insertions(+), 65 deletions(-) diff --git a/handler/rca/scene/ddl_disk_full_scene.py b/handler/rca/scene/ddl_disk_full_scene.py index ea87065e..50a7c734 100644 --- a/handler/rca/scene/ddl_disk_full_scene.py +++ b/handler/rca/scene/ddl_disk_full_scene.py @@ -22,6 +22,21 @@ from common.tool import StringUtils +def translate_byte(B): + if B < 0: + B = -B + return '-' + translate_byte(B) + if B == 0: + return '0B' + units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'] + k = 1024 + i = 0 + while B >= k and i < len(units) - 1: + B /= k + i += 1 + return f"{B:.2f} {units[i]}" + + class DDlDiskFullScene(RcaScene): def __init__(self): super().__init__() @@ -35,19 +50,18 @@ def __init__(self): def init(self, context): super().init(context) - + minVersion = "4.0.0.0" ## observer version>4.2.1.0 observer_version = self.observer_version if observer_version is None or len(observer_version.strip()) == 0: raise RCAInitException("observer version is None. Please check the NODES conf.") - if not (observer_version == "4.2.1.0" or StringUtils.compare_versions_greater(observer_version, "4.2.1.0")): - self.stdio.error("observer version is {0}, which is less than 4.2.1.0.".format(observer_version)) - raise RCAInitException("observer version is {0}, which is less than 4.2.1.0.".format(observer_version)) + if StringUtils.compare_versions_greater(minVersion, observer_version): + self.stdio.error("observer version is {0}, which is less than {1}.".format(observer_version, minVersion)) + raise RCAInitException("observer version is {0}, which is less than {1}.".format(observer_version, minVersion)) if self.ob_connector is None: raise RCAInitException("ob_connector is None. Please check the NODES conf.") self.verbose("observer version is {0}.".format(observer_version)) # check table_name and tenant_name - table_name = self.input_parameters.get("table_name") tenant_name = self.input_parameters.get("tenant_name") action_type = self.input_parameters.get("action_type") @@ -74,7 +88,7 @@ def init(self, context): if self.tenant_id is None: raise RCAInitException("can not find tenant id by tenant name: {0}. Please check the tenant name.".format(tenant_name)) - table_id_data = self.ob_connector.execute_sql("select table_id from oceanbase.__all_virtual_table where table_name = '{0}';".format(table_name)) + table_id_data = self.ob_connector.execute_sql("select table_id from oceanbase.__all_virtual_table where table_name = '{0}' and tenant_id = '{1}';".format(table_name, self.tenant_id)) if len(table_id_data) == 0: raise RCAInitException("can not find table id by table name: {0}. Please check the table name.".format(table_name)) self.table_id = table_id_data[0][0] @@ -92,15 +106,19 @@ def execute(self): # get estimated_data_size self.verbose("start to get estimated_data_size...") ## if the action is not add_index + # 获取各个节点上的源表大小,单位为B + # self.stdio._call_stdio('start_loading', 'gstart query estimated_data_size, please wait some minutes...') sql = "select svr_ip, svr_port, sum(original_size) as estimated_data_size from oceanbase.__all_virtual_tablet_sstable_macro_info where tablet_id in (select tablet_id from oceanbase.__all_virtual_tablet_to_table_history where table_id = {0}) and (svr_ip, svr_port) in (select svr_ip, svr_port from oceanbase.__all_virtual_ls_meta_table where role = 1) group by svr_ip, svr_port;".format( self.table_id ) self.verbose("execute_sql is {0}".format(sql)) - tablet_size_data = self.ob_connector.execute_sql(sql) - self.verbose("tablet_size_data is {0}".format(tablet_size_data)) - self.record.add_record("tablet_size_data is {0}".format(tablet_size_data)) - if len(tablet_size_data) <= 0 or tablet_size_data[0][2] is None: - raise RCAExecuteException("can not find tablet size info or estimated_data_size. please check the data:{0}.".format(tablet_size_data)) + tablet_size_data = self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall() + # self.stdio._call_stdio('stop_loading', 'succeed') + for item in tablet_size_data: + tablet_size_data_ip = item["svr_ip"] + tablet_size_data_port = item["svr_port"] + tablet_size_data_estimated_data_size = item["estimated_data_size"] + self.record.add_record("on {0}:{1} tablet_size: {2} as {3}".format(tablet_size_data_ip, tablet_size_data_port, tablet_size_data_estimated_data_size, translate_byte(tablet_size_data_estimated_data_size))) self.estimated_size = tablet_size_data self.verbose("estimated_size is {0}".format(self.estimated_size)) self.record.add_record("estimated_size is {0}".format(self.estimated_size)) @@ -113,83 +131,83 @@ def execute(self): ## if the action is add_index sql = "select table_id from oceanbase.__all_virtual_table_history where tenant_id = '{0}' and data_table_id = '{1}' and table_name like '%{2}%';".format(self.tenant_id, self.table_id, self.index_name) self.verbose("execute_sql is {0}".format(sql)) - self.index_table_id = self.ob_connector.execute_sql(sql)[0][0] + self.index_table_id = self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()[0]["table_id"] self.verbose("index_table_id is {0}".format(self.index_table_id)) self.record.add_record("index_table_id is {0}".format(self.index_table_id)) # Query the sum of the lengths of all columns in the main table - sql = "select table_id, sum(data_length) from oceanbase.__all_virtual_column_history where tenant_id = '{0}' and table_id = '{1}';".format(self.tenant_id, self.table_id) + sql = "select table_id, sum(data_length) as data_length from oceanbase.__all_virtual_column_history where tenant_id = '{0}' and table_id = '{1}';".format(self.tenant_id, self.table_id) self.verbose("execute_sql is {0}".format(sql)) - main_table_sum_of_data_length = int(self.ob_connector.execute_sql(sql)[0][1]) - self.verbose("main_table_sum_of_data_length is {0}".format(main_table_sum_of_data_length)) + main_table_sum_of_data_length = int(self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()[0]["data_length"]) self.record.add_record("main_table_sum_of_data_length is {0}".format(main_table_sum_of_data_length)) # The sum of the lengths of all columns in the query index - sql = "select table_id, sum(data_length) from oceanbase.__all_virtual_column_history where tenant_id = '{0}' and table_id = '{1}';".format(self.tenant_id, self.index_table_id) + sql = "select table_id, sum(data_length) as data_length from oceanbase.__all_virtual_column_history where tenant_id = '{0}' and table_id = '{1}';".format(self.tenant_id, self.index_table_id) self.verbose("execute_sql is {0}".format(sql)) - index_table_sum_of_data_length = int(self.ob_connector.execute_sql(sql)[0][1]) + index_table_sum_of_data_length = int(self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()[0]["data_length"]) self.verbose("index_table_sum_of_data_length is {0}".format(index_table_sum_of_data_length)) self.record.add_record("index_table_sum_of_data_length is {0}".format(index_table_sum_of_data_length)) # new_estimated_size = [] for node_estimated_size in self.estimated_size: - new_node_estimated_size = [node_estimated_size[0], node_estimated_size[1]] - estimiated_index_size = int(index_table_sum_of_data_length / main_table_sum_of_data_length / 1024 / 1024 * int(node_estimated_size[2])) - - new_node_estimated_size.append(estimiated_index_size) + new_node_estimated_size = {} + new_node_estimated_size["svr_ip"] = node_estimated_size["svr_ip"] + new_node_estimated_size["svr_port"] = node_estimated_size["svr_port"] + estimiated_index_size = int(index_table_sum_of_data_length / main_table_sum_of_data_length * int(node_estimated_size["estimated_data_size"])) + self.record.add_record( + "estimated_index_size without magnification {0}B as {1} from: index_table_sum_of_data_length({2})/main_table_sum_of_data_length({3}) * estimated_data_size({4})".format( + estimiated_index_size, translate_byte(estimiated_index_size), index_table_sum_of_data_length, main_table_sum_of_data_length, int(node_estimated_size["estimated_data_size"]) + ) + ) + if self.observer_version == "4.2.3.0" or StringUtils.compare_versions_greater(self.observer_version, "4.2.3.0"): + self.record.add_record("magnification is 1.5") + target_server_estimated_size = int(estimiated_index_size * 15 / 10) + else: + self.record.add_record("magnification is 5.5") + target_server_estimated_size = int(estimiated_index_size * 55 / 10) + self.record.add_record("estimated_index_size with magnification is {0}B as {1}".format(target_server_estimated_size, translate_byte(target_server_estimated_size))) + new_node_estimated_size["estimiated_index_size"] = estimiated_index_size new_estimated_size.append(new_node_estimated_size) + self.record.add_record( + "On target_server_ip is {0}, target_server_port is {1}, estimiated_index_size is {2}B as {3}".format(node_estimated_size["svr_ip"], node_estimated_size["svr_port"], estimiated_index_size, translate_byte(estimiated_index_size)) + ) self.estimated_size = new_estimated_size - self.verbose("estimated_size is {0}".format(self.estimated_size)) - - for estimated_size in self.estimated_size: - target_server_ip = estimated_size[0] - target_server_port = estimated_size[1] - target_server_estimated_size = int(estimated_size[2]) - self.verbose("On target_server_ip is {0}, target_server_port is {1}, target_server_estimated_size is {2}".format(target_server_ip, target_server_port, target_server_estimated_size)) - self.record.add_record("On target_server_ip is {0}, target_server_port is {1}, target_server_estimated_size is {2}".format(target_server_ip, target_server_port, target_server_estimated_size)) - - # get target_server_total_size and target_server_used_size - target_server_data = self.ob_connector.execute_sql("select total_size, used_size from oceanbase.__all_virtual_disk_stat where svr_ip = '{0}' and svr_port = {1};".format(target_server_ip, target_server_port)) - target_server_total_size = int(target_server_data[0][0]) - self.verbose("target_server_total_size is {0}".format(target_server_total_size)) - self.record.add_record("target_server_total_size is {0}".format(target_server_total_size)) - - target_server_used_size = int(target_server_data[0][1]) - self.verbose("target_server_used_size is {0}".format(target_server_used_size)) - self.record.add_record("target_server_used_size is {0}".format(target_server_used_size)) - - # get data_disk_usage_limit_percentage - sql = "SELECT VALUE FROM oceanbase.GV$OB_PARAMETERS WHERE SVR_IP='{0}' and SVR_PORT='{1}' and NAME LIKE \"data_disk_usage_limit_percentage\"".format(target_server_ip, target_server_port) - self.verbose("execute_sql is {0}".format(sql)) - data_disk_usage_limit_percentage = int(self.ob_connector.execute_sql(sql)[0][0]) - # data_disk_usage_limit_percentage is a Cluster level configuration items - self.verbose("data_disk_usage_limit_percentage is {0}".format(data_disk_usage_limit_percentage)) - self.record.add_record("data_disk_usage_limit_percentage is {0}".format(data_disk_usage_limit_percentage)) - if self.observer_version == "4.2.3.0" or StringUtils.compare_versions_greater(self.observer_version, "4.2.3.0"): - target_server_estimated_size = int(target_server_estimated_size * 15 / 10) - else: - target_server_estimated_size = int(target_server_estimated_size * 55 / 10) - self.verbose("target_server_estimated_size is {0}".format(target_server_estimated_size)) - self.record.add_record("target_server_estimated_size is {0}".format(target_server_estimated_size)) - - available_disk_space = int(target_server_total_size / 100 * data_disk_usage_limit_percentage - target_server_used_size) - self.verbose("available_disk_space is {0}".format(available_disk_space)) - self.record.add_record("available_disk_space is {0}".format(available_disk_space)) - - if target_server_estimated_size - available_disk_space > 0: - self.record.add_record("target_server_estimated_size - available_disk_space is {0}".format(target_server_estimated_size - available_disk_space)) - self.record.add_suggest("the disk space of server({0}:{1}) disk is not enough. please add the server disk".format(target_server_ip, target_server_port)) - else: - self.record.add_record("target_server_estimated_size - available_disk_space is {0}".format(target_server_estimated_size - available_disk_space)) - self.record.add_suggest("the disk space of server({0}:{1}) is enough. Don't warn ".format(target_server_ip, target_server_port)) + for estimated_size in self.estimated_size: + target_server_ip = estimated_size["svr_ip"] + target_server_port = estimated_size["svr_port"] + target_server_estimated_size = int(estimated_size["estimiated_index_size"]) + # 最终所需空间 + self.record.add_record("On target_serveris {0}:{1}".format(target_server_ip, target_server_port)) + self.record.add_record("target_server_estimated_size is {0}B as {1}".format(target_server_estimated_size, translate_byte(target_server_estimated_size))) + # 开始收集可用空间 + # get target_server_total_size and target_server_used_size + target_server_data = self.ob_connector.execute_sql_return_cursor_dictionary( + "select total_size, used_size from oceanbase.__all_virtual_disk_stat where svr_ip = '{0}' and svr_port = {1};".format(target_server_ip, target_server_port) + ).fetchall() + target_server_total_size = int(target_server_data[0]["total_size"]) + self.record.add_record("target_server_total_size is {0}B as {1}".format(target_server_total_size, translate_byte(target_server_total_size))) + target_server_used_size = int(target_server_data[0]["used_size"]) + self.record.add_record("target_server_used_size is {0}B as {1}".format(target_server_used_size, translate_byte(target_server_used_size))) + # get data_disk_usage_limit_percentage + sql = "SELECT VALUE FROM oceanbase.GV$OB_PARAMETERS WHERE SVR_IP='{0}' and SVR_PORT='{1}' and NAME LIKE \"data_disk_usage_limit_percentage\"".format(target_server_ip, target_server_port) + self.verbose("execute_sql is {0}".format(sql)) + data_disk_usage_limit_percentage = int(self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()[0]["VALUE"]) + # data_disk_usage_limit_percentage is a Cluster level configuration items + self.record.add_record("data_disk_usage_limit_percentage is {0}".format(data_disk_usage_limit_percentage)) + available_disk_space = int(target_server_total_size / 100 * data_disk_usage_limit_percentage - target_server_used_size) + self.record.add_record("available_disk_space is {0}B as {1}".format(available_disk_space, translate_byte(available_disk_space))) + self.record.add_record("available_disk_space - target_server_estimated_size is {0}B as {1}".format(available_disk_space - target_server_estimated_size, translate_byte(available_disk_space - target_server_estimated_size))) + if target_server_estimated_size > available_disk_space: + self.record.add_suggest("the disk space of server({0}:{1}) disk is not enough. please add the server disk".format(target_server_ip, target_server_port)) + else: + self.record.add_suggest("the disk space of server({0}:{1}) is enough. Don't warn ".format(target_server_ip, target_server_port)) except Exception as e: raise RCAExecuteException("DDlDiskFullScene execute error: {0}".format(e)) finally: self.stdio.verbose("end DDlDiskFullScene execute") def get_scene_info(self): - return { "name": "ddl_disk_full", "info_en": "Insufficient disk space reported during DDL process. ",