Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix rca ddl_disk_full_scene #263

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 83 additions & 65 deletions handler/rca/scene/ddl_disk_full_scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@
from common.tool import StringUtils


def translate_byte(B):
if B < 0:
B = -B
return '-' + translate_byte(B)
if B == 0:
return '0B'
units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']
k = 1024
i = 0
while B >= k and i < len(units) - 1:
B /= k
i += 1
return f"{B:.2f} {units[i]}"


class DDlDiskFullScene(RcaScene):
def __init__(self):
super().__init__()
Expand All @@ -35,19 +50,18 @@ def __init__(self):

def init(self, context):
super().init(context)

minVersion = "4.0.0.0"
## observer version>4.2.1.0
observer_version = self.observer_version
if observer_version is None or len(observer_version.strip()) == 0:
raise RCAInitException("observer version is None. Please check the NODES conf.")
if not (observer_version == "4.2.1.0" or StringUtils.compare_versions_greater(observer_version, "4.2.1.0")):
self.stdio.error("observer version is {0}, which is less than 4.2.1.0.".format(observer_version))
raise RCAInitException("observer version is {0}, which is less than 4.2.1.0.".format(observer_version))
if StringUtils.compare_versions_greater(minVersion, observer_version):
self.stdio.error("observer version is {0}, which is less than {1}.".format(observer_version, minVersion))
raise RCAInitException("observer version is {0}, which is less than {1}.".format(observer_version, minVersion))
if self.ob_connector is None:
raise RCAInitException("ob_connector is None. Please check the NODES conf.")
self.verbose("observer version is {0}.".format(observer_version))
# check table_name and tenant_name

table_name = self.input_parameters.get("table_name")
tenant_name = self.input_parameters.get("tenant_name")
action_type = self.input_parameters.get("action_type")
Expand All @@ -74,7 +88,7 @@ def init(self, context):
if self.tenant_id is None:
raise RCAInitException("can not find tenant id by tenant name: {0}. Please check the tenant name.".format(tenant_name))

table_id_data = self.ob_connector.execute_sql("select table_id from oceanbase.__all_virtual_table where table_name = '{0}';".format(table_name))
table_id_data = self.ob_connector.execute_sql("select table_id from oceanbase.__all_virtual_table where table_name = '{0}' and tenant_id = '{1}';".format(table_name, self.tenant_id))
if len(table_id_data) == 0:
raise RCAInitException("can not find table id by table name: {0}. Please check the table name.".format(table_name))
self.table_id = table_id_data[0][0]
Expand All @@ -92,15 +106,19 @@ def execute(self):
# get estimated_data_size
self.verbose("start to get estimated_data_size...")
## if the action is not add_index
# 获取各个节点上的源表大小,单位为B
# self.stdio._call_stdio('start_loading', 'gstart query estimated_data_size, please wait some minutes...')
sql = "select svr_ip, svr_port, sum(original_size) as estimated_data_size from oceanbase.__all_virtual_tablet_sstable_macro_info where tablet_id in (select tablet_id from oceanbase.__all_virtual_tablet_to_table_history where table_id = {0}) and (svr_ip, svr_port) in (select svr_ip, svr_port from oceanbase.__all_virtual_ls_meta_table where role = 1) group by svr_ip, svr_port;".format(
self.table_id
)
self.verbose("execute_sql is {0}".format(sql))
tablet_size_data = self.ob_connector.execute_sql(sql)
self.verbose("tablet_size_data is {0}".format(tablet_size_data))
self.record.add_record("tablet_size_data is {0}".format(tablet_size_data))
if len(tablet_size_data) <= 0 or tablet_size_data[0][2] is None:
raise RCAExecuteException("can not find tablet size info or estimated_data_size. please check the data:{0}.".format(tablet_size_data))
tablet_size_data = self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()
# self.stdio._call_stdio('stop_loading', 'succeed')
for item in tablet_size_data:
tablet_size_data_ip = item["svr_ip"]
tablet_size_data_port = item["svr_port"]
tablet_size_data_estimated_data_size = item["estimated_data_size"]
self.record.add_record("on {0}:{1} tablet_size: {2} as {3}".format(tablet_size_data_ip, tablet_size_data_port, tablet_size_data_estimated_data_size, translate_byte(tablet_size_data_estimated_data_size)))
self.estimated_size = tablet_size_data
self.verbose("estimated_size is {0}".format(self.estimated_size))
self.record.add_record("estimated_size is {0}".format(self.estimated_size))
Expand All @@ -113,83 +131,83 @@ def execute(self):
## if the action is add_index
sql = "select table_id from oceanbase.__all_virtual_table_history where tenant_id = '{0}' and data_table_id = '{1}' and table_name like '%{2}%';".format(self.tenant_id, self.table_id, self.index_name)
self.verbose("execute_sql is {0}".format(sql))
self.index_table_id = self.ob_connector.execute_sql(sql)[0][0]
self.index_table_id = self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()[0]["table_id"]
self.verbose("index_table_id is {0}".format(self.index_table_id))
self.record.add_record("index_table_id is {0}".format(self.index_table_id))

# Query the sum of the lengths of all columns in the main table
sql = "select table_id, sum(data_length) from oceanbase.__all_virtual_column_history where tenant_id = '{0}' and table_id = '{1}';".format(self.tenant_id, self.table_id)
sql = "select table_id, sum(data_length) as data_length from oceanbase.__all_virtual_column_history where tenant_id = '{0}' and table_id = '{1}';".format(self.tenant_id, self.table_id)
self.verbose("execute_sql is {0}".format(sql))
main_table_sum_of_data_length = int(self.ob_connector.execute_sql(sql)[0][1])
self.verbose("main_table_sum_of_data_length is {0}".format(main_table_sum_of_data_length))
main_table_sum_of_data_length = int(self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()[0]["data_length"])
self.record.add_record("main_table_sum_of_data_length is {0}".format(main_table_sum_of_data_length))

# The sum of the lengths of all columns in the query index
sql = "select table_id, sum(data_length) from oceanbase.__all_virtual_column_history where tenant_id = '{0}' and table_id = '{1}';".format(self.tenant_id, self.index_table_id)
sql = "select table_id, sum(data_length) as data_length from oceanbase.__all_virtual_column_history where tenant_id = '{0}' and table_id = '{1}';".format(self.tenant_id, self.index_table_id)
self.verbose("execute_sql is {0}".format(sql))
index_table_sum_of_data_length = int(self.ob_connector.execute_sql(sql)[0][1])
index_table_sum_of_data_length = int(self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()[0]["data_length"])
self.verbose("index_table_sum_of_data_length is {0}".format(index_table_sum_of_data_length))
self.record.add_record("index_table_sum_of_data_length is {0}".format(index_table_sum_of_data_length))

#
new_estimated_size = []
for node_estimated_size in self.estimated_size:
new_node_estimated_size = [node_estimated_size[0], node_estimated_size[1]]
estimiated_index_size = int(index_table_sum_of_data_length / main_table_sum_of_data_length / 1024 / 1024 * int(node_estimated_size[2]))

new_node_estimated_size.append(estimiated_index_size)
new_node_estimated_size = {}
new_node_estimated_size["svr_ip"] = node_estimated_size["svr_ip"]
new_node_estimated_size["svr_port"] = node_estimated_size["svr_port"]
estimiated_index_size = int(index_table_sum_of_data_length / main_table_sum_of_data_length * int(node_estimated_size["estimated_data_size"]))
self.record.add_record(
"estimated_index_size without magnification {0}B as {1} from: index_table_sum_of_data_length({2})/main_table_sum_of_data_length({3}) * estimated_data_size({4})".format(
estimiated_index_size, translate_byte(estimiated_index_size), index_table_sum_of_data_length, main_table_sum_of_data_length, int(node_estimated_size["estimated_data_size"])
)
)
if self.observer_version == "4.2.3.0" or StringUtils.compare_versions_greater(self.observer_version, "4.2.3.0"):
self.record.add_record("magnification is 1.5")
target_server_estimated_size = int(estimiated_index_size * 15 / 10)
else:
self.record.add_record("magnification is 5.5")
target_server_estimated_size = int(estimiated_index_size * 55 / 10)
self.record.add_record("estimated_index_size with magnification is {0}B as {1}".format(target_server_estimated_size, translate_byte(target_server_estimated_size)))
new_node_estimated_size["estimiated_index_size"] = estimiated_index_size
new_estimated_size.append(new_node_estimated_size)
self.record.add_record(
"On target_server_ip is {0}, target_server_port is {1}, estimiated_index_size is {2}B as {3}".format(node_estimated_size["svr_ip"], node_estimated_size["svr_port"], estimiated_index_size, translate_byte(estimiated_index_size))
)
self.estimated_size = new_estimated_size
self.verbose("estimated_size is {0}".format(self.estimated_size))

for estimated_size in self.estimated_size:
target_server_ip = estimated_size[0]
target_server_port = estimated_size[1]
target_server_estimated_size = int(estimated_size[2])
self.verbose("On target_server_ip is {0}, target_server_port is {1}, target_server_estimated_size is {2}".format(target_server_ip, target_server_port, target_server_estimated_size))
self.record.add_record("On target_server_ip is {0}, target_server_port is {1}, target_server_estimated_size is {2}".format(target_server_ip, target_server_port, target_server_estimated_size))

# get target_server_total_size and target_server_used_size
target_server_data = self.ob_connector.execute_sql("select total_size, used_size from oceanbase.__all_virtual_disk_stat where svr_ip = '{0}' and svr_port = {1};".format(target_server_ip, target_server_port))
target_server_total_size = int(target_server_data[0][0])
self.verbose("target_server_total_size is {0}".format(target_server_total_size))
self.record.add_record("target_server_total_size is {0}".format(target_server_total_size))

target_server_used_size = int(target_server_data[0][1])
self.verbose("target_server_used_size is {0}".format(target_server_used_size))
self.record.add_record("target_server_used_size is {0}".format(target_server_used_size))

# get data_disk_usage_limit_percentage
sql = "SELECT VALUE FROM oceanbase.GV$OB_PARAMETERS WHERE SVR_IP='{0}' and SVR_PORT='{1}' and NAME LIKE \"data_disk_usage_limit_percentage\"".format(target_server_ip, target_server_port)
self.verbose("execute_sql is {0}".format(sql))
data_disk_usage_limit_percentage = int(self.ob_connector.execute_sql(sql)[0][0])
# data_disk_usage_limit_percentage is a Cluster level configuration items
self.verbose("data_disk_usage_limit_percentage is {0}".format(data_disk_usage_limit_percentage))
self.record.add_record("data_disk_usage_limit_percentage is {0}".format(data_disk_usage_limit_percentage))
if self.observer_version == "4.2.3.0" or StringUtils.compare_versions_greater(self.observer_version, "4.2.3.0"):
target_server_estimated_size = int(target_server_estimated_size * 15 / 10)
else:
target_server_estimated_size = int(target_server_estimated_size * 55 / 10)
self.verbose("target_server_estimated_size is {0}".format(target_server_estimated_size))
self.record.add_record("target_server_estimated_size is {0}".format(target_server_estimated_size))

available_disk_space = int(target_server_total_size / 100 * data_disk_usage_limit_percentage - target_server_used_size)
self.verbose("available_disk_space is {0}".format(available_disk_space))
self.record.add_record("available_disk_space is {0}".format(available_disk_space))

if target_server_estimated_size - available_disk_space > 0:
self.record.add_record("target_server_estimated_size - available_disk_space is {0}".format(target_server_estimated_size - available_disk_space))
self.record.add_suggest("the disk space of server({0}:{1}) disk is not enough. please add the server disk".format(target_server_ip, target_server_port))
else:
self.record.add_record("target_server_estimated_size - available_disk_space is {0}".format(target_server_estimated_size - available_disk_space))
self.record.add_suggest("the disk space of server({0}:{1}) is enough. Don't warn ".format(target_server_ip, target_server_port))
for estimated_size in self.estimated_size:
target_server_ip = estimated_size["svr_ip"]
target_server_port = estimated_size["svr_port"]
target_server_estimated_size = int(estimated_size["estimiated_index_size"])
# 最终所需空间
self.record.add_record("On target_serveris {0}:{1}".format(target_server_ip, target_server_port))
self.record.add_record("target_server_estimated_size is {0}B as {1}".format(target_server_estimated_size, translate_byte(target_server_estimated_size)))
# 开始收集可用空间
# get target_server_total_size and target_server_used_size
target_server_data = self.ob_connector.execute_sql_return_cursor_dictionary(
"select total_size, used_size from oceanbase.__all_virtual_disk_stat where svr_ip = '{0}' and svr_port = {1};".format(target_server_ip, target_server_port)
).fetchall()
target_server_total_size = int(target_server_data[0]["total_size"])
self.record.add_record("target_server_total_size is {0}B as {1}".format(target_server_total_size, translate_byte(target_server_total_size)))
target_server_used_size = int(target_server_data[0]["used_size"])
self.record.add_record("target_server_used_size is {0}B as {1}".format(target_server_used_size, translate_byte(target_server_used_size)))
# get data_disk_usage_limit_percentage
sql = "SELECT VALUE FROM oceanbase.GV$OB_PARAMETERS WHERE SVR_IP='{0}' and SVR_PORT='{1}' and NAME LIKE \"data_disk_usage_limit_percentage\"".format(target_server_ip, target_server_port)
self.verbose("execute_sql is {0}".format(sql))
data_disk_usage_limit_percentage = int(self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()[0]["VALUE"])
# data_disk_usage_limit_percentage is a Cluster level configuration items
self.record.add_record("data_disk_usage_limit_percentage is {0}".format(data_disk_usage_limit_percentage))
available_disk_space = int(target_server_total_size / 100 * data_disk_usage_limit_percentage - target_server_used_size)
self.record.add_record("available_disk_space is {0}B as {1}".format(available_disk_space, translate_byte(available_disk_space)))
self.record.add_record("available_disk_space - target_server_estimated_size is {0}B as {1}".format(available_disk_space - target_server_estimated_size, translate_byte(available_disk_space - target_server_estimated_size)))
if target_server_estimated_size > available_disk_space:
self.record.add_suggest("the disk space of server({0}:{1}) disk is not enough. please add the server disk".format(target_server_ip, target_server_port))
else:
self.record.add_suggest("the disk space of server({0}:{1}) is enough. Don't warn ".format(target_server_ip, target_server_port))
except Exception as e:
raise RCAExecuteException("DDlDiskFullScene execute error: {0}".format(e))
finally:
self.stdio.verbose("end DDlDiskFullScene execute")

def get_scene_info(self):

return {
"name": "ddl_disk_full",
"info_en": "Insufficient disk space reported during DDL process. ",
Expand Down
Loading