Skip to content

Commit

Permalink
obdiag analyze memory update
Browse files Browse the repository at this point in the history
  • Loading branch information
Teingi committed Nov 14, 2024
1 parent b11acae commit 4eebb83
Showing 1 changed file with 38 additions and 146 deletions.
184 changes: 38 additions & 146 deletions handler/analyzer/analyze_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@

"""
@time: 2024/9/18
@file: analyze_memory_module.py
@file: analyze_memory.py
@desc:
"""
import os
import re
import time
from common.command import get_observer_version_by_sql
from common.tool import DirectoryUtil, TimeUtils, Util
from common.obdiag_exception import OBDIAGFormatException
import datetime
from common.ob_log_level import OBLogLevel
from common.constant import const
from common.tool import FileUtil
import tabulate
Expand Down Expand Up @@ -54,7 +53,6 @@ def __init__(self, context):
self.grep_args = None
self.scope = 'observer'
self.zip_encrypt = False
self.log_level = OBLogLevel.WARN
self.config_path = const.DEFAULT_CONFIG_PATH
self.version = None

Expand All @@ -80,8 +78,6 @@ def init_option(self):
since_option = Util.get_option(options, 'since')
store_dir_option = Util.get_option(options, 'store_dir')
grep_option = Util.get_option(options, 'grep')
scope_option = Util.get_option(options, 'scope')
log_level_option = Util.get_option(options, 'log_level')
files_option = Util.get_option(options, 'files')
temp_dir_option = Util.get_option(options, 'temp_dir')
if files_option:
Expand Down Expand Up @@ -124,10 +120,6 @@ def init_option(self):
self.gather_pack_dir = os.path.abspath(store_dir_option)
if grep_option is not None:
self.grep_args = grep_option
if scope_option:
self.scope = scope_option
if log_level_option:
self.log_level = OBLogLevel().get_log_level(scope_option)
if temp_dir_option:
self.gather_ob_log_temporary_dir = temp_dir_option
return True
Expand All @@ -137,7 +129,7 @@ def get_version(self):
try:
observer_version = get_observer_version_by_sql(self.ob_cluster, self.stdio)
except Exception as e:
self.stdio.warn("failed to get observer version:{0}".format(e))
self.stdio.exception("failed to get observer version:{0}".format(e))
self.stdio.verbose("get observer version: {0}".format(observer_version))
return observer_version

Expand All @@ -153,9 +145,11 @@ def handle(self):
analyze_tuples = []

def handle_from_node(node):
resp, node_results = self.__handle_from_node(node, local_store_parent_dir)
analyze_tuples.append((node.get("ip"), False, resp["error"], node_results))
st = time.time()
resp = self.__handle_from_node(node, local_store_parent_dir)
analyze_tuples.append((node.get("ip"), False, resp["error"], int(time.time() - st), resp["result_pack_path"]))

self.stdio.start_loading('analyze memory start')
if self.is_ssh:
for node in self.nodes:
handle_from_node(node)
Expand All @@ -165,30 +159,18 @@ def handle_from_node(node):
node["ip"] = local_ip
handle_from_node(node)

self.stdio.start_loading('analyze result start')
title, field_names, summary_list, summary_details_list = self.__get_overall_summary(analyze_tuples, self.directly_analyze_files)
table = tabulate.tabulate(summary_list, headers=field_names, tablefmt="grid", showindex=False)
self.stdio.stop_loading('analyze result sucess')
self.stdio.print(title)
self.stdio.print(table)
FileUtil.write_append(os.path.join(local_store_parent_dir, "result_details.txt"), title + str(table) + "\n\nDetails:\n\n")

for m in range(len(summary_details_list)):
for n in range(len(field_names)):
extend = "\n\n" if n == len(field_names) - 1 else "\n"
FileUtil.write_append(os.path.join(local_store_parent_dir, "result_details.txt"), field_names[n] + ": " + str(summary_details_list[m][n]) + extend)
last_info = "For more details, please run cmd \033[32m' cat {0} '\033[0m\n".format(os.path.join(local_store_parent_dir, "result_details.txt"))
self.stdio.print(last_info)
# get info from local_store_parent_dir+/result_details.txt
summary_tuples = self.__get_overall_summary(analyze_tuples)
self.stdio.stop_loading('analyze memory sucess')
self.stdio.print(summary_tuples)
FileUtil.write_append(os.path.join(local_store_parent_dir, "result_summary.txt"), summary_tuples)
analyze_info = ""
with open(os.path.join(local_store_parent_dir, "result_details.txt"), "r", encoding="utf-8") as f:
with open(os.path.join(local_store_parent_dir, "result_summary.txt"), "r", encoding="utf-8") as f:
analyze_info = f.read()
return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"result": analyze_info})

def __handle_from_node(self, node, local_store_parent_dir):
resp = {"skip": False, "error": ""}
resp = {"skip": False, "error": "", "result_pack_path": ""}
remote_ip = node.get("ip") if self.is_ssh else '127.0.0.1'
node_results = []
try:
ssh_client = SshClient(self.context, node)
self.stdio.verbose("Sending Collect Shell Command to node {0} ...".format(remote_ip))
Expand All @@ -207,8 +189,9 @@ def __handle_from_node(self, node, local_store_parent_dir):
mkdir(ssh_client, gather_dir_full_path)
log_list, resp = self.__handle_log_list(ssh_client, node, resp)
self.stdio.print(log_list)
resp["result_pack_path"] = local_store_dir
if resp["skip"]:
return resp, node_results
return resp
self.stdio.print(FileUtil.show_file_list_tabulate(remote_ip, log_list, self.stdio))
for log_name in log_list:
if self.directly_analyze_files:
Expand All @@ -217,7 +200,7 @@ def __handle_from_node(self, node, local_store_parent_dir):
else:
self.__pharse_log_file(ssh_client, node=node, log_name=log_name, gather_path=gather_dir_full_path, local_store_dir=local_store_dir)
analyze_log_full_path = "{0}/{1}".format(local_store_dir, log_name)
self.stdio.print(analyze_log_full_path)
self.stdio.verbose("local file storage path: {0}".format(analyze_log_full_path))

tenant_memory_info_dict = dict()
for log_name in log_list:
Expand All @@ -226,24 +209,22 @@ def __handle_from_node(self, node, local_store_parent_dir):
analyze_log_full_path = "{0}/{1}".format(local_store_dir, str(log_name).strip(".").replace("/", "_"))
else:
analyze_log_full_path = "{0}/{1}".format(local_store_dir, log_name)
self.stdio.print('analyze memory info in {0} start'.format(log_name))
file_result = self.__parse_log_lines(analyze_log_full_path, memory_info)
self.stdio.verbose('analyze memory info from {0} start'.format(log_name))
self.__parse_log_lines(analyze_log_full_path, memory_info)
for sample_time in memory_info:
for tenant in memory_info[sample_time]:
if tenant in tenant_memory_info_dict:
tenant_memory_info_dict[tenant][sample_time] = memory_info[sample_time][tenant]
else:
tenant_memory_info_dict[tenant] = dict()
tenant_memory_info_dict[tenant][sample_time] = memory_info[sample_time][tenant]
self.stdio.print('analyze memory info in {0} sucess'.format(log_name))
node_results.append(file_result)
self.stdio.verbose('analyze memory info from {0} sucess'.format(log_name))
result_file = analyze_log_full_path + '.json'
try:
with open(result_file, 'w', encoding='utf8') as rf:
rf.write(json.dumps(memory_info, indent=4))
except Exception as e:
traceback.print_exc()
exit(-1)
self.stdio.exception('write json result failed, error: {0}'.format(e))
try:
fig = go.Figure()
colors = ['blue', 'orange', 'green', 'red', 'purple', 'cyan', 'magenta', 'yellow', 'black', 'brown', 'pink', 'gray', 'lime', 'teal', 'navy']
Expand Down Expand Up @@ -455,12 +436,10 @@ def __handle_from_node(self, node, local_store_parent_dir):
f.write(html_top15_combined)
# plot(fig, filename='{0}/TOP15_tenant_hold_memory.html'.format(local_store_dir))
except Exception as e:
traceback.print_exc()
exit(-1)
self.stdio.exception('write html result failed, error: {0}'.format(e))
delete_file(ssh_client, gather_dir_full_path, self.stdio)
ssh_client.ssh_close()

return resp, node_results
return resp

def __handle_log_list(self, ssh_client, node, resp):
if self.directly_analyze_files:
Expand Down Expand Up @@ -488,10 +467,7 @@ def __get_log_name_list(self, ssh_client, node):
"""
home_path = node.get("home_path")
log_path = os.path.join(home_path, "log")
if self.scope == "observer" or self.scope == "rootservice" or self.scope == "election":
get_oblog = "ls -1 -F %s/*%s.log* |grep -v wf|awk -F '/' '{print $NF}'" % (log_path, self.scope)
else:
get_oblog = "ls -1 -F %s/observer.log* %s/rootservice.log* %s/election.log* | awk -F '/' '{print $NF}'" % (log_path, log_path, log_path)
get_oblog = "ls -1 -F %s/*%s.log* |grep -v wf|awk -F '/' '{print $NF}'" % (log_path, self.scope)
log_name_list = []
log_files = ssh_client.exec_cmd(get_oblog)
if log_files:
Expand Down Expand Up @@ -555,27 +531,6 @@ def __pharse_offline_log_file(self, ssh_client, log_name, local_store_dir):
else:
download_file(ssh_client, log_name, local_store_path, self.stdio)

def __get_observer_ret_code(self, log_line):
"""
Get the ret code from the observer log
:param log_line
:return: ret_code
"""
prefix = "ret=-"
idx = log_line.find(prefix)
if idx < 0:
return ""
start = idx + len(prefix)
if start >= len(log_line):
return ""
end = start
while end < len(log_line):
c = log_line[end]
if c < '0' or c > '9':
break
end = end + 1
return "-" + log_line[start:end]

def __parse_memory_label(self, file_full_path):
ssh_client = ssh_client_local_client.LocalClient(context=self.context, node={"ssh_type": "local"})
if self.version > '4.0':
Expand Down Expand Up @@ -605,13 +560,10 @@ def __parse_log_lines(self, file_full_path, memory_dict):
"""
Process the observer's log line by line
:param file_full_path
:return: error_dict
:return:
"""

error_dict = {}
self.stdio.verbose("start parse log {0}".format(file_full_path))
memory_print_line_list = self.__parse_memory_label(file_full_path)
# self.stdio.print(memory_print_line_list)
if memory_print_line_list:
with open(file_full_path, 'r', encoding='utf8', errors='replace') as file:
line_num = 0
Expand Down Expand Up @@ -751,10 +703,9 @@ def __parse_log_lines(self, file_full_path, memory_dict):
tenant_dict['ctx_info'] = []
tenant_dict['ctx_info'].append(ctx_info)
except Exception as e:
print(line)
traceback.print_exc()
self.stdio.exception('parse log failed, error: {0}'.format(e))
self.stdio.verbose("complete parse log {0}".format(file_full_path))
return error_dict
return

def __get_time_from_ob_log_line(self, log_line):
"""
Expand All @@ -767,80 +718,21 @@ def __get_time_from_ob_log_line(self, log_line):
time_str = log_line[1 : log_line.find(']')]
return time_str

def __get_trace_id(self, log_line):
"""
Get the trace_id from the observer's log line
:param log_line
:return: trace_id
"""
pattern = re.compile(r'\[Y(.*?)\]')
find = pattern.search(log_line)
if find and find.group(1):
return find.group(1).strip('[').strip(']')

def __get_log_level(self, log_line):
"""
Get the log level from the observer's log line
:param log_line
:return: log level
"""
level_lits = ["DEBUG ", "TRACE ", "INFO ", "WDIAG ", "WARN ", "EDIAG ", "ERROR ", "FATAL "]
length = len(log_line)
if length > 38:
length = 38
for level in level_lits:
idx = log_line[:length].find(level)
if idx != -1:
return OBLogLevel().get_log_level(level.rstrip())
return 0

@staticmethod
def __get_overall_summary(node_summary_tuples, is_files=False):
def __get_overall_summary(node_summary_tuple):
"""
generate overall summary from all node summary tuples
:param node_summary_tuple
:param node_summary_tuple: (node, is_err, err_msg, size, consume_time, node_summary) for each node
:return: a string indicating the overall summary
"""
field_names = ["Node", "Status", "FileName", "ErrorCode", "Message", "Count"]
t = []
t_details = []
field_names_details = field_names
field_names_details.extend(["Cause", "Solution", "First Found Time", "Last Found Time", "Trace_IDS"])
for tup in node_summary_tuples:
is_empty = True
summary_tab = []
field_names = ["Node", "Status"]
field_names.append("Time")
field_names.append("ResultPath")
for tup in node_summary_tuple:
node = tup[0]
is_err = tup[2]
node_results = tup[3]
if is_err:
is_empty = False
t.append([node, "Error:" + tup[2] if is_err else "Completed", None, None, None, None])
t_details.append([node, "Error:" + tup[2] if is_err else "Completed", None, None, None, None, None, None, None, None, None])
for log_result in node_results:
for ret_key, ret_value in log_result.items():
if ret_key is not None:
error_code_info = OB_RET_DICT.get(ret_key, "")
if len(error_code_info) > 3:
is_empty = False
t.append([node, "Error:" + tup[2] if is_err else "Completed", ret_value["file_name"], ret_key, error_code_info[1], ret_value["count"]])
t_details.append(
[
node,
"Error:" + tup[2] if is_err else "Completed",
ret_value["file_name"],
ret_key,
error_code_info[1],
ret_value["count"],
error_code_info[2],
error_code_info[3],
ret_value["first_found_time"],
ret_value["last_found_time"],
str(ret_value["trace_id_list"]),
]
)
if is_empty:
t.append([node, "\033[32mPASS\033[0m", None, None, None, None])
t_details.append([node, "\033[32mPASS\033[0m", None, None, None, None, None, None, None, None, None])
title = "\nAnalyze OceanBase Offline Log Summary:\n" if is_files else "\nAnalyze OceanBase Online Log Summary:\n"
t.sort(key=lambda x: (x[0], x[1], x[2], x[3]), reverse=False)
t_details.sort(key=lambda x: (x[0], x[1], x[2], x[3]), reverse=False)
return title, field_names, t, t_details
is_err = tup[1]
consume_time = tup[3]
pack_path = tup[4]
summary_tab.append((node, "Error:" + tup[2] if is_err else "Completed", "{0} s".format(consume_time), pack_path))
return "\nAnalyze Ob Log Summary:\n" + tabulate.tabulate(summary_tab, headers=field_names, tablefmt="grid", showindex=False)

0 comments on commit 4eebb83

Please sign in to comment.