From b66a0f91c5aa1b8db84ed7e7ec8444e605089d63 Mon Sep 17 00:00:00 2001 From: Jaganathan Anbalagan Date: Fri, 21 Jul 2023 07:45:18 -0700 Subject: [PATCH 1/7] Signed-off-by: Jaganathan Anbalagan Refer to the HLD for problem description and design. HLD: https://github.com/sonic-net/SONiC/pull/1258 --- sonic-xcvrd/xcvrd/pm_mgr.py | 465 ++++++++++++++++++ sonic-xcvrd/xcvrd/xcvrd.py | 157 +++--- .../xcvrd/xcvrd_utilities/port_mapping.py | 64 ++- .../xcvrd_utilities/xcvr_table_helper.py | 61 +++ 4 files changed, 636 insertions(+), 111 deletions(-) create mode 100644 sonic-xcvrd/xcvrd/pm_mgr.py create mode 100644 sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py diff --git a/sonic-xcvrd/xcvrd/pm_mgr.py b/sonic-xcvrd/xcvrd/pm_mgr.py new file mode 100644 index 000000000..f16870ce3 --- /dev/null +++ b/sonic-xcvrd/xcvrd/pm_mgr.py @@ -0,0 +1,465 @@ +#!/usr/bin/env python3 + +""" + Performance Monitoring statistics manager +""" + +try: + import ast + import copy + import sys + import threading + import time + import traceback + + from swsscommon import swsscommon + from .xcvrd_utilities import sfp_status_helper + from .xcvrd_utilities.xcvr_table_helper import XcvrTableHelper + from .xcvrd_utilities import port_mapping +except ImportError as e: + raise ImportError(str(e) + " - required module not found") + +# +# Constants ==================================================================== +# + +PM_INFO_UPDATE_PERIOD_SECS = 60 + +MAX_NUM_60SEC_WINDOW = 15 +MAX_NUM_15MIN_WINDOW = 12 +MAX_NUM_24HRS_WINDOW = 2 + +WINDOW_60SEC_START_NUM = 1 +WINDOW_15MIN_START_NUM = 16 +WINDOW_24HRS_START_NUM = 28 + +WINDOW_60SEC_END_NUM = 15 +WINDOW_15MIN_END_NUM = 27 +WINDOW_24HRS_END_NUM = 29 + +TIME_60SEC_IN_SECS = 60 +TIME_15MIN_IN_SECS = 900 +TIME_24HRS_IN_SECS = 86400 + +class PmUpdateTask(threading.Thread): + + # Subscribe to below table in Redis DB + PORT_TBL_MAP = [ + {'STATE_DB': 'TRANSCEIVER_INFO', 'FILTER': ['media_interface_code']}, + ] + + def __init__(self, namespaces, port_mapping_data, main_thread_stop_event, platform_chassis, helper_logger, process_start_option, pm_interval): + threading.Thread.__init__(self) + self.name = "PmUpdateTask" + self.exc = None + self.task_stopping_event = threading.Event() + self.main_thread_stop_event = main_thread_stop_event + self.port_mapping_data = copy.deepcopy(port_mapping_data) + self.namespaces = namespaces + self.pm_interval = pm_interval + self.helper_logger = helper_logger + self.platform_chassis = platform_chassis + self.process_start_option = process_start_option + self.xcvr_table_helper = XcvrTableHelper(namespaces) + #Port numbers that gets iterated for PM update. + self.pm_port_list = [] + + def log_notice(self, message): + self.helper_logger.log_notice("PM: {}".format(message)) + + def log_warning(self, message): + self.helper_logger.log_warning("PM: {}".format(message)) + + def log_error(self, message): + self.helper_logger.log_error("PM: {}".format(message)) + + def get_transceiver_pm(self, physical_port): + if self.platform_chassis is not None: + try: + return self.platform_chassis.get_sfp(physical_port).get_transceiver_pm() + except NotImplementedError: + pass + return {} + + def get_port_admin_status(self, lport): + admin_status = 'down' + + asic_index = self.port_mapping_data.get_asic_id_for_logical_port(lport) + cfg_port_tbl = self.xcvr_table_helper.get_cfg_port_tbl(asic_index) + + found, port_info = cfg_port_tbl.get(lport) + if found: + admin_status = dict(port_info).get('admin_status', 'down') + return admin_status + + def on_port_update_event(self, port_change_event): + """Invoked when there is a change in TRANSCIEVER_INFO for the port. + Initialize the PM window slots to default when the TRANSCIEVER_INFO table is created. + Delete the PM window slots when the TRANSCIEVER_INFO table is deleted. + + Args: + port_change_event (object): port change event + """ + if (port_change_event.event_type + not in [port_change_event.PORT_SET, port_change_event.PORT_DEL]): + return + + lport = port_change_event.port_name + pport = port_change_event.port_index + asic_id = port_change_event.asic_id + + # Skip if it's not a physical port + if not lport.startswith('Ethernet'): + return + + # Skip if the physical index is not available + if pport is None: + return + + if port_change_event.port_dict is None: + return + + physical_port = self.port_mapping_data.get_logical_to_physical(lport)[0] + self.log_notice("PM port event type:{} for lport: {} table name: {}".format(port_change_event.event_type, lport, port_change_event.table_name)) + if port_change_event.event_type == port_change_event.PORT_SET: + if (True if "ZR" not in str(port_change_event.port_dict.get('media_interface_code', None)) else False): + return + + if not self.platform_chassis.get_sfp(physical_port).get_presence(): + self.log_error("Change in TRANSCIEVER_INFO table for the port {}, but transciever is not present".format(lport)) + return + + #update the port list + if lport not in self.pm_port_list: + self.pm_port_list.append(lport) + else: + self.log_notice("Port number:{} already present in port list :: Spurious request\n".format(lport)) + return + pm_win_dict = {} + # Set the TRANSCEIVER_PM_WINDOW_STATS keys for the port with default value + for key in ["window{}".format(win_num) for win_num in range(WINDOW_60SEC_START_NUM, WINDOW_24HRS_END_NUM + 1)]: + pm_win_dict[key] = "N/A" + + #Update the table in DB + state_pm_win_stats_tbl = self.xcvr_table_helper.get_pm_window_stats_tbl(asic_id) + fvs = swsscommon.FieldValuePairs([(k, v) for k, v in pm_win_dict.items()]) + state_pm_win_stats_tbl.set(lport, fvs) + self.log_notice("Port add event:{} added to pm port list".format(port_change_event.port_name)) + elif port_change_event.event_type == port_change_event.PORT_DEL and port_change_event.table_name == 'TRANSCEIVER_INFO': + + if self.platform_chassis.get_sfp(physical_port).get_presence(): + self.log_error("TRANSCIEVER_INFO table for the port {} is deleted but transciever is present".format(lport)) + return + #Remove the port from port list and delete the TRANSCEIVER_PM_WINDOW_STATS table for the port + self.pm_port_list.remove(lport) + state_pm_win_stats_tbl = self.xcvr_table_helper.get_pm_window_stats_tbl(asic_id) + if state_pm_win_stats_tbl: + state_pm_win_stats_tbl._del(lport) + self.log_notice("Port delete event:{} deleted from pm port list".format(port_change_event.port_name)) + + # Update the PM statistics in string format to respective window key to + # TRANSCEIVER_PM_WINDOW_STATS_TABLE in state DB. + def set_pm_stats_in_pm_win_stats_tbl(self, asic, lport, window, pm_data_str): + state_pm_win_stats_tbl = self.xcvr_table_helper.get_pm_window_stats_tbl(asic) + fvs = swsscommon.FieldValuePairs([(window, str(pm_data_str))]) + state_pm_win_stats_tbl.set(lport, fvs) + + # Retrieve the PM statistics from respective window key from. + # TRANSCEIVER_PM_WINDOW_STATS_TABLE - state DB. + def get_pm_stats_in_pm_win_stats_tbl(self, asic, lport, window): + state_pm_win_stats_tbl = self.xcvr_table_helper.get_pm_window_stats_tbl(asic) + status, fvs = state_pm_win_stats_tbl.get(str(lport)) + stats_tbl_tuple = dict((k, v) for k, v in fvs) + if stats_tbl_tuple != "": + return (stats_tbl_tuple.get(window, "")) + else: + return None + + # + # input args: asic index, logical port, start and end window number of a PM window granularity, + # PM window granularity in secs, pm stats read from module. + # + # Algorithm to sample and update the PM stats to the appropriate PM window in TRANSCEIVER_PM_WINDOW_STATS_TABLE. + def pm_window_update_to_DB(self, asic, lport, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data): + window_num = start_window + while window_num < (end_window + 1): + #Retrieve PM data from DB + pm_data = {} + pm_data = self.get_pm_stats_in_pm_win_stats_tbl(lport, asic, "window"+str(window_num)) + if pm_data == 'N/A' and window_num == start_window: + pm_data_dict = {} + for key, value in pm_hw_data.items(): + pm_data_dict[key] = value + #First PM data read from the module + pm_data_dict['pm_win_start_time'] = pm_hw_data.get('pm_win_end_time') + pm_data_dict['pm_win_end_time'] = pm_hw_data.get('pm_win_end_time') + pm_data_dict['pm_win_current'] = 'true' + pm_data_str = str(pm_data_dict) + self.set_pm_stats_in_pm_win_stats_tbl(lport, asic, "window{}".format(window_num), pm_data_str) + break; + elif pm_data != 'N/A': + #retrieve pm_win_current index pm data from DB-the start of PM time window slot + pm_data_dic = ast.literal_eval(pm_data) + + if (float(pm_data_dic.get('pm_win_end_time')) == float(pm_data_dic.get('pm_win_start_time'))): + #discard the 1st set of PM data retrieved from the module. + pm_data_dic1 = {} + for key, value in pm_hw_data.items(): + pm_data_dic1[key] = value + #First PM data read from the module + pm_data_dic1['pm_win_start_time'] = pm_data_dic.get('pm_win_start_time') + pm_data_dic1['pm_win_end_time'] = pm_hw_data.get('pm_win_end_time') + pm_data_dic1['pm_win_current'] = 'true' + pm_data_str = str(pm_data_dic1) + self.set_pm_stats_in_pm_win_stats_tbl(lport, asic, "window{}".format(window_num), pm_data_str) + break + + if (float(pm_data_dic.get('pm_win_end_time')) - float(pm_data_dic.get('pm_win_start_time'))) < pm_win_itrvl_in_secs: + sampled_pm_data = {} + #Sample the data between pm_data and pm_hw_data. + sampled_pm_data = self.pm_data_sampling(pm_data_dic, pm_hw_data) + sampled_pm_data['pm_win_end_time'] = pm_hw_data.get('pm_win_end_time') + sampled_pm_data['pm_win_start_time'] = pm_data_dic.get('pm_win_start_time') + sampled_pm_data['pm_win_current'] = pm_data_dic.get('pm_win_current') + #update the window with sampled data in DB. + self.set_pm_stats_in_pm_win_stats_tbl(lport, asic, "window{}".format(window_num), sampled_pm_data) + break + + #set current to false in current pm window as this time window is completed. + pm_data_dic['pm_win_current'] = "false" + #update the current field in the current window_num key of pm_data + self.set_pm_stats_in_pm_win_stats_tbl(lport, asic, "window{}".format(window_num), pm_data_dic) + + #retrieve next pm window number and its data + if window_num == end_window: + next_window_num = start_window + else: + next_window_num = window_num + 1 + next_pm_data = {} + next_pm_data_dic = {} + next_pm_data = self.get_pm_stats_in_pm_win_stats_tbl(lport, asic, "window"+str(next_window_num)) + # If next window pm_data is empty, then fill the next window with pm_hw_data and + # set the curremt, start and end time, + # update the pm_win_current to false in current_pm_data and write to DB + if next_pm_data == 'N/A': + # fill the hw data to the next window + for key, value in pm_hw_data.items(): + next_pm_data_dic[key] = value + + next_pm_data_dic['pm_win_current'] = "true" + next_pm_data_dic['pm_win_start_time'] = pm_data_dic.get('pm_win_end_time') + next_pm_data_dic['pm_win_end_time'] = pm_hw_data.get('pm_win_end_time') + #convert to string and update current value of next pm data to DB + self.set_pm_stats_in_pm_win_stats_tbl(lport, asic, "window{}".format(next_window_num), str(next_pm_data_dic)) + break; + else: + #next window pm data is not empty, check which window is current and udpate next window + next_pm_data_dic = ast.literal_eval(next_pm_data) + + if (float(next_pm_data_dic.get('pm_win_end_time')) == float(next_pm_data_dic.get('pm_win_start_time'))): + #discard the 1st set of PM data retrieved from the module. + next_pm_data_dic1 = {} + next_pm_data_dic1_str ="" + for key, value in pm_hw_data.items(): + next_pm_data_dic1[key] = value + #First PM data read from the module + next_pm_data_dic1['pm_win_start_time'] = next_pm_data.get('pm_win_start_time') + next_pm_data_dic1['pm_win_end_time'] = pm_hw_data.get('pm_win_end_time') + next_pm_data_dic1['pm_win_current'] = 'true' + next_pm_data_str = str(next_pm_data_dic1) + self.set_pm_stats_in_pm_win_stats_tbl(lport, asic, "window{}".format(next_window_num), next_pm_data_str) + break + + + if (float(next_pm_data_dic.get('pm_win_end_time')) - float(next_pm_data_dic.get('pm_win_start_time'))) < pm_win_itrvl_in_secs: + #Sample the data between pm_data and pm_hw_data. + sampled_pm_data = self.pm_data_sampling(next_pm_data_dic, pm_hw_data) + sampled_pm_data['pm_win_end_time'] = pm_hw_data.get('pm_win_end_time') + sampled_pm_data['pm_win_start_time'] = pm_data_dic.get('pm_win_end_time') + sampled_pm_data['pm_win_current'] = next_pm_data_dic.get('pm_win_current') + #update the window with sampled data in DB. + self.set_pm_stats_in_pm_win_stats_tbl(lport, asic, "window{}".format(next_window_num), str(sampled_pm_data)) + break + else: + if float(pm_data_dic.get('pm_win_start_time')) < float(next_pm_data_dic.get('pm_win_start_time')): + window_num = window_num + 1 + continue + else: + #Fill the pm_hw_data into DB with key:window+1, with start and end time updated . + next_pm_data_dic = {} + for key, value in pm_hw_data.items(): + next_pm_data_dic[key] = value + + next_pm_data_dic['pm_win_current'] = "true" + next_pm_data_dic['pm_win_start_time'] = pm_data_dic.get('pm_win_end_time') + next_pm_data_dic['pm_win_end_time'] = pm_hw_data.get('pm_win_end_time') + #convert to string and update current value of prev pm data to DB + self.set_pm_stats_in_pm_win_stats_tbl(lport, asic, "window{}".format(next_window_num), next_pm_data_dic) + #pm_win_stat_from_db["window{}".format(next_window_num)] = str(next_pm_data_dic) + + #update the current field in the current window_num key of pm_data + pm_data_dic['pm_win_current'] = 'false' + self.set_pm_stats_in_pm_win_stats_tbl(lport, asic, "window{}".format(window_num), str(pm_data_dic)) + break; + + + def average_of_two_val(self, val1, val2): + return((val1+val2)/2) + + # perform min, max and average of relevant PM stats parameter between two dictionary + def pm_data_sampling(self, pm_data_dict1, pm_data_dict2): + sampled_pm_dict = {} + try: + sampled_pm_dict['prefec_ber_avg'] = self.average_of_two_val(float(pm_data_dict1['prefec_ber_avg']), float(pm_data_dict2['prefec_ber_avg'])) + sampled_pm_dict['prefec_ber_min'] = min(float(pm_data_dict1['prefec_ber_min']), float(pm_data_dict2['prefec_ber_min'])) + sampled_pm_dict['prefec_ber_max'] = max(float(pm_data_dict1['prefec_ber_max']), float(pm_data_dict2['prefec_ber_max'])) + + sampled_pm_dict['uncorr_frames_avg'] = self.average_of_two_val(float(pm_data_dict1['uncorr_frames_avg']), float(pm_data_dict2['uncorr_frames_avg'])) + sampled_pm_dict['uncorr_frames_min'] = min(float(pm_data_dict1['uncorr_frames_min']), float(pm_data_dict2['uncorr_frames_min'])) + sampled_pm_dict['uncorr_frames_max'] = max(float(pm_data_dict1['uncorr_frames_max']), float(pm_data_dict2['uncorr_frames_max'])) + + sampled_pm_dict['cd_avg'] = self.average_of_two_val(float(pm_data_dict1['cd_avg']), float(pm_data_dict2['cd_avg'])) + sampled_pm_dict['cd_min'] = min(float(pm_data_dict1['cd_min']), float(pm_data_dict2['cd_min'])) + sampled_pm_dict['cd_max'] = max(float(pm_data_dict1['cd_max']), float(pm_data_dict2['cd_max'])) + + sampled_pm_dict['dgd_avg'] = self.average_of_two_val(float(pm_data_dict1['dgd_avg']), float(pm_data_dict2['dgd_avg'])) + sampled_pm_dict['dgd_min'] = min(float(pm_data_dict1['dgd_min']), float(pm_data_dict2['dgd_min'])) + sampled_pm_dict['dgd_max'] = max(float(pm_data_dict1['dgd_max']), float(pm_data_dict2['dgd_max'])) + + sampled_pm_dict['sopmd_avg'] = self.average_of_two_val(float(pm_data_dict1['sopmd_avg']), float(pm_data_dict2['sopmd_avg'])) + sampled_pm_dict['sopmd_min'] = min(float(pm_data_dict1['sopmd_min']), float(pm_data_dict2['sopmd_min'])) + sampled_pm_dict['sopmd_max'] = max(float(pm_data_dict1['sopmd_max']), float(pm_data_dict2['sopmd_max'])) + + sampled_pm_dict['pdl_avg'] = self.average_of_two_val(float(pm_data_dict1['pdl_avg']), float(pm_data_dict2['pdl_avg'])) + sampled_pm_dict['pdl_min'] = min(float(pm_data_dict1['pdl_min']), float(pm_data_dict2['pdl_min'])) + sampled_pm_dict['pdl_max'] = max(float(pm_data_dict1['pdl_max']), float(pm_data_dict2['pdl_max'])) + + sampled_pm_dict['osnr_avg'] = self.average_of_two_val(float(pm_data_dict1['osnr_avg']), float(pm_data_dict2['osnr_avg'])) + sampled_pm_dict['osnr_min'] = min(float(pm_data_dict1['osnr_min']), float(pm_data_dict2['osnr_min'])) + sampled_pm_dict['osnr_max'] = max(float(pm_data_dict1['osnr_max']), float(pm_data_dict2['osnr_max'])) + + sampled_pm_dict['esnr_avg'] = self.average_of_two_val(float(pm_data_dict1['esnr_avg']), float(pm_data_dict2['esnr_avg'])) + sampled_pm_dict['esnr_min'] = min(float(pm_data_dict1['esnr_min']), float(pm_data_dict2['esnr_min'])) + sampled_pm_dict['esnr_max'] = max(float(pm_data_dict1['esnr_max']), float(pm_data_dict2['esnr_max'])) + + sampled_pm_dict['cfo_avg'] = self.average_of_two_val(float(pm_data_dict1['cfo_avg']), float(pm_data_dict2['cfo_avg'])) + sampled_pm_dict['cfo_min'] = min(float(pm_data_dict1['cfo_min']), float(pm_data_dict2['cfo_min'])) + sampled_pm_dict['cfo_max'] = max(float(pm_data_dict1['cfo_max']), float(pm_data_dict2['cfo_max'])) + + sampled_pm_dict['evm_avg'] = self.average_of_two_val(float(pm_data_dict1['evm_avg']), float(pm_data_dict2['evm_avg'])) + sampled_pm_dict['evm_min'] = min(float(pm_data_dict1['evm_min']), float(pm_data_dict2['evm_min'])) + sampled_pm_dict['evm_max'] = max(float(pm_data_dict1['evm_max']), float(pm_data_dict2['evm_max'])) + + sampled_pm_dict['soproc_avg'] = self.average_of_two_val(float(pm_data_dict1['soproc_avg']), float(pm_data_dict2['soproc_avg'])) + sampled_pm_dict['soproc_min'] = min(float(pm_data_dict1['soproc_min']), float(pm_data_dict2['soproc_min'])) + sampled_pm_dict['soproc_max'] = max(float(pm_data_dict1['soproc_max']), float(pm_data_dict2['soproc_max'])) + + sampled_pm_dict['tx_power_avg'] = self.average_of_two_val(float(pm_data_dict1['tx_power_avg']), float(pm_data_dict2['tx_power_avg'])) + sampled_pm_dict['tx_power_min'] = min(float(pm_data_dict1['tx_power_min']), float(pm_data_dict2['tx_power_min'])) + sampled_pm_dict['tx_power_max'] = max(float(pm_data_dict1['tx_power_max']), float(pm_data_dict2['tx_power_max'])) + + sampled_pm_dict['rx_tot_power_avg'] = self.average_of_two_val(float(pm_data_dict1['rx_tot_power_avg']), float(pm_data_dict2['rx_tot_power_avg'])) + sampled_pm_dict['rx_tot_power_min'] = min(float(pm_data_dict1['rx_tot_power_min']), float(pm_data_dict2['rx_tot_power_min'])) + sampled_pm_dict['rx_tot_power_max'] = max(float(pm_data_dict1['rx_tot_power_max']), float(pm_data_dict2['rx_tot_power_max'])) + + sampled_pm_dict['rx_sig_power_avg'] = self.average_of_two_val(float(pm_data_dict1['rx_sig_power_avg']), float(pm_data_dict2['rx_sig_power_avg'])) + sampled_pm_dict['rx_sig_power_min'] = min(float(pm_data_dict1['rx_sig_power_min']), float(pm_data_dict2['rx_sig_power_min'])) + sampled_pm_dict['rx_sig_power_max'] = max(float(pm_data_dict1['rx_sig_power_max']), float(pm_data_dict2['rx_sig_power_max'])) + except ValueError as e: + self.log_error("Value Error: {}".format(e)) + return sampled_pm_dict + + def is_current_window(self, pm_data_dict): + return (True if (pm_data_dic.get('pm_win_current', false) == 'true') else False) + + def beautify_pm_info_dict(self, pm_info_dict, physical_port): + for k, v in pm_info_dict.items(): + if type(v) is str: + continue + pm_info_dict[k] = str(v) + + def task_worker(self): + self.log_notice("Start Performance monitoring for 400G ZR modules") + sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces, self, self.PORT_TBL_MAP) + + # Start loop to update PM stats info to DB periodically for PM interval time, default is 60sec + while not self.task_stopping_event.wait(self.pm_interval): + + # Handle port change event from main thread + port_mapping.handle_port_update_event(sel, asic_context, self.task_stopping_event, self, self.on_port_update_event) + for lport in self.pm_port_list: + asic_index = self.port_mapping_data.get_asic_id_for_logical_port(lport) + if asic_index is None: + continue + + physical_port = self.port_mapping_data.get_logical_to_physical(lport)[0] + if not physical_port: + continue + + #skip if sfp obj or sfp is not present + sfp = self.platform_chassis.get_sfp(physical_port) + if sfp is not None: + if not sfp.get_presence(): + continue + else: + continue + + if self.get_port_admin_status(lport) != 'up': + continue + + if not sfp_status_helper.detect_port_in_error_status(lport, self.xcvr_table_helper.get_status_tbl(asic_index)): + try: + pm_hw_data = self.get_transceiver_pm(physical_port) + except (KeyError, TypeError) as e: + #continue to process next port since execption could be raised due to port reset, transceiver removal + self.log_warning("Got exception {} while reading pm stats for port {}, ignored".format(repr(e), lport)) + continue + + if not pm_hw_data: + continue + + self.beautify_pm_info_dict(pm_hw_data, physical_port) + + # Update 60Sec PM time window slots + start_window = WINDOW_60SEC_START_NUM + end_window = WINDOW_60SEC_END_NUM + pm_win_itrvl_in_secs = TIME_60SEC_IN_SECS + self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) + + # Update 15min PM time window slots + start_window = WINDOW_15MIN_START_NUM + end_window = WINDOW_15MIN_END_NUM + pm_win_itrvl_in_secs = TIME_15MIN_IN_SECS + self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) + + # Update 24hrs PM time window slots + start_window = WINDOW_24HRS_START_NUM + end_window = WINDOW_24HRS_END_NUM + pm_win_itrvl_in_secs = TIME_24HRS_IN_SECS + self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) + + self.log_notice("Stop Performance Monitoring") + + def run(self): + if self.task_stopping_event.is_set(): + return + + try: + self.task_worker() + except Exception as e: + self.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().getName(), repr(e))) + exc_type, exc_value, exc_traceback = sys.exc_info() + msg = traceback.format_exception(exc_type, exc_value, exc_traceback) + for tb_line in msg: + for tb_line_split in tb_line.splitlines(): + self.log_error(tb_line_split) + self.exc = e + self.main_thread_stop_event.set() + + def join(self): + self.task_stopping_event.set() + threading.Thread.join(self) + if self.exc: + raise self.exc + + diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 6fc59a00f..0c5d4fcb9 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -27,6 +27,8 @@ from .xcvrd_utilities import sfp_status_helper from .xcvrd_utilities import port_mapping + from .xcvrd_utilities.xcvr_table_helper import XcvrTableHelper + from .pm_mgr import PmUpdateTask except ImportError as e: raise ImportError(str(e) + " - required module not found") @@ -57,6 +59,9 @@ STATE_MACHINE_UPDATE_PERIOD_MSECS = 60000 TIME_FOR_SFP_READY_SECS = 1 +PM_INTERVAL_TIME = "pm_interval_time" +DEFAULT_PM_INTERVAL_60SEC = 60 + EVENT_ON_ALL_SFP = '-1' # events definition SYSTEM_NOT_READY = 'system_not_ready' @@ -87,6 +92,11 @@ POWER_UNIT = 'dBm' BIAS_UNIT = 'mA' +USR_SHARE_SONIC_PATH = "/usr/share/sonic" +HOST_DEVICE_PATH = USR_SHARE_SONIC_PATH + "/device" +CONTAINER_PLATFORM_PATH = USR_SHARE_SONIC_PATH + "/platform" +PLATFORM_PMON_CTRL_FILENAME = "pmon_daemon_control.json" + g_dict = {} # Global platform specific sfputil class instance platform_sfputil = None @@ -202,15 +212,6 @@ def _wrapper_get_transceiver_status(physical_port): return {} -def _wrapper_get_transceiver_pm(physical_port): - if platform_chassis is not None: - try: - return platform_chassis.get_sfp(physical_port).get_transceiver_pm() - except NotImplementedError: - pass - return {} - - # Soak SFP insert event until management init completes def _wrapper_soak_sfp_insert_event(sfp_insert_events, port_dict): for key, value in list(port_dict.items()): @@ -515,35 +516,6 @@ def post_port_dom_info_to_db(logical_port_name, port_mapping, table, stop_event= helper_logger.log_error("This functionality is currently not implemented for this platform") sys.exit(NOT_IMPLEMENTED_ERROR) -# Update port pm info in db - - -def post_port_pm_info_to_db(logical_port_name, port_mapping, table, stop_event=threading.Event(), pm_info_cache=None): - for physical_port, physical_port_name in get_physical_port_name_dict(logical_port_name, port_mapping).items(): - if stop_event.is_set(): - break - - if not _wrapper_get_presence(physical_port): - continue - - if pm_info_cache is not None and physical_port in pm_info_cache: - # If cache is enabled and pm info is in cache, just read from cache, no need read from EEPROM - pm_info_dict = pm_info_cache[physical_port] - else: - pm_info_dict = _wrapper_get_transceiver_pm(physical_port) - if pm_info_cache is not None: - # If cache is enabled, put dom information to cache - pm_info_cache[physical_port] = pm_info_dict - if pm_info_dict is not None: - # Skip if empty (i.e. get_transceiver_pm API is not applicable for this xcvr) - if not pm_info_dict: - continue - beautify_pm_info_dict(pm_info_dict, physical_port) - fvs = swsscommon.FieldValuePairs([(k, v) for k, v in pm_info_dict.items()]) - table.set(physical_port_name, fvs) - else: - return SFP_EEPROM_NOT_READY - # Delete port dom/sfp info from db @@ -556,8 +528,6 @@ def del_port_sfp_dom_info_from_db(logical_port_name, port_mapping, int_tbl, dom_ dom_tbl._del(physical_port_name) if dom_threshold_tbl: dom_threshold_tbl._del(physical_port_name) - if pm_tbl: - pm_tbl._del(physical_port_name) except NotImplementedError: helper_logger.log_error("This functionality is currently not implemented for this platform") @@ -1764,12 +1734,6 @@ def task_worker(self): #continue to process next port since execption could be raised due to port reset, transceiver removal helper_logger.log_warning("Got exception {} while processing transceiver status hw for port {}, ignored".format(repr(e), logical_port_name)) continue - try: - post_port_pm_info_to_db(logical_port_name, self.port_mapping, self.xcvr_table_helper.get_pm_tbl(asic_index), self.task_stopping_event, pm_info_cache=pm_info_cache) - except (KeyError, TypeError) as e: - #continue to process next port since execption could be raised due to port reset, transceiver removal - helper_logger.log_warning("Got exception {} while processing pm info for port {}, ignored".format(repr(e), logical_port_name)) - continue helper_logger.log_info("Stop DOM monitoring loop") @@ -2359,6 +2323,7 @@ def __init__(self, log_identifier, skip_cmis_mgr=False): self.stop_event = threading.Event() self.sfp_error_event = threading.Event() self.skip_cmis_mgr = skip_cmis_mgr + self.pm_interval = DEFAULT_PM_INTERVAL_60SEC self.namespaces = [''] self.threads = [] @@ -2487,6 +2452,48 @@ def deinit(self): del globals()['platform_chassis'] + def load_feature_flags(self): + """ + Load feature enable/skip flags from platform files. + """ + platform_pmon_ctrl_file_path = self.get_platform_pmon_ctrl_file_path() + if platform_pmon_ctrl_file_path is not None: + # Load the JSON file + with open(platform_pmon_ctrl_file_path) as file: + data = json.load(file) + self.pm_interval = DEFAULT_PM_INTERVAL_60SEC + if PM_INTERVAL_TIME in data: + pm_interval = int(data[PM_INTERVAL_TIME]) + if(( pm_interval == 30) or (pm_interval == 60) or (pm_interval == 120)): + self.pm_interval = pm_interval + + def get_platform_pmon_ctrl_file_path(self): + """ + Get the platform PMON control file path. + + The method generates a list of potential file paths, prioritizing the container platform path. + It then checks these paths in order to find an existing file. + + Returns: + str: The first found platform PMON control file path. + None: If no file path exists. + """ + platform_env_conf_path_candidates = [] + + platform_env_conf_path_candidates.append( + os.path.join(CONTAINER_PLATFORM_PATH, PLATFORM_PMON_CTRL_FILENAME)) + + platform = device_info.get_platform() + if platform: + platform_env_conf_path_candidates.append( + os.path.join(HOST_DEVICE_PATH, platform, PLATFORM_PMON_CTRL_FILENAME)) + + for platform_env_conf_file_path in platform_env_conf_path_candidates: + if os.path.isfile(platform_env_conf_file_path): + return platform_env_conf_file_path + + return None + # Run daemon def run(self): @@ -2495,6 +2502,7 @@ def run(self): # Start daemon initialization sequence port_mapping_data = self.init() + self.load_feature_flags() # Start the CMIS manager cmis_manager = CmisManagerTask(self.namespaces, port_mapping_data, self.stop_event, self.skip_cmis_mgr) if not self.skip_cmis_mgr: @@ -2511,6 +2519,11 @@ def run(self): sfp_state_update.start() self.threads.append(sfp_state_update) + # Start the Performance monitoring update thread + pm_mgr = PmUpdateTask(self.namespaces, port_mapping_data, self.stop_event, platform_chassis, helper_logger, 0, self.pm_interval) + pm_mgr.start() + self.threads.append(pm_mgr) + # Start main loop self.log_notice("Start daemon main loop with thread count {}".format(len(self.threads))) for thread in self.threads: @@ -2547,6 +2560,11 @@ def run(self): if sfp_state_update.is_alive(): sfp_state_update.raise_exception() sfp_state_update.join() + + # Stop the SFF manager + if pm_mgr is not None: + if pm_mgr.is_alive(): + pm_mgr.join() # Start daemon deinitialization sequence self.deinit() @@ -2557,53 +2575,6 @@ def run(self): sys.exit(SFP_SYSTEM_ERROR) -class XcvrTableHelper: - def __init__(self, namespaces): - self.int_tbl, self.dom_tbl, self.dom_threshold_tbl, self.status_tbl, self.app_port_tbl, \ - self.cfg_port_tbl, self.state_port_tbl, self.pm_tbl = {}, {}, {}, {}, {}, {}, {}, {} - self.state_db = {} - self.cfg_db = {} - for namespace in namespaces: - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - self.state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) - self.int_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_INFO_TABLE) - self.dom_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) - self.dom_threshold_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_DOM_THRESHOLD_TABLE) - self.status_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_STATUS_TABLE) - self.pm_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_PM_TABLE) - self.state_port_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], swsscommon.STATE_PORT_TABLE_NAME) - appl_db = daemon_base.db_connect("APPL_DB", namespace) - self.app_port_tbl[asic_id] = swsscommon.ProducerStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) - self.cfg_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) - self.cfg_port_tbl[asic_id] = swsscommon.Table(self.cfg_db[asic_id], swsscommon.CFG_PORT_TABLE_NAME) - - def get_intf_tbl(self, asic_id): - return self.int_tbl[asic_id] - - def get_dom_tbl(self, asic_id): - return self.dom_tbl[asic_id] - - def get_dom_threshold_tbl(self, asic_id): - return self.dom_threshold_tbl[asic_id] - - def get_status_tbl(self, asic_id): - return self.status_tbl[asic_id] - - def get_pm_tbl(self, asic_id): - return self.pm_tbl[asic_id] - - def get_app_port_tbl(self, asic_id): - return self.app_port_tbl[asic_id] - - def get_state_db(self, asic_id): - return self.state_db[asic_id] - - def get_cfg_port_tbl(self, asic_id): - return self.cfg_port_tbl[asic_id] - - def get_state_port_tbl(self, asic_id): - return self.state_port_tbl[asic_id] - # # Main ========================================================================= # diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py index c7965f343..96ac19db0 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py @@ -1,19 +1,33 @@ +import threading from sonic_py_common import daemon_base from sonic_py_common import multi_asic from sonic_py_common.interface import backplane_prefix, inband_prefix, recirc_prefix from swsscommon import swsscommon SELECT_TIMEOUT_MSECS = 1000 +DEFAULT_PORT_TBL_MAP = [ + {'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME}, + {'STATE_DB': 'TRANSCEIVER_INFO'}, + {'STATE_DB': 'PORT_TABLE', 'FILTER': ['host_tx_ready']}, +] + class PortChangeEvent: + # thread_local_data will be used to hold class-scope thread-safe variables + # (i.e. global variable to the local thread), such as + # thread_local_data.PORT_EVENT. thread_local_data.PORT_EVENT dict will be + # initialized in subscribe_port_update_event, and used to store the latest + # port change event for each key, to avoid duplicate event processing. + thread_local_data = threading.local() PORT_ADD = 0 PORT_REMOVE = 1 PORT_SET = 2 PORT_DEL = 3 PORT_EVENT = {} - def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None): + def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None, + db_name=None, table_name=None): # Logical port name, e.g. Ethernet0 self.port_name = port_name # Physical port index, equals to "index" field of PORT table in CONFIG_DB @@ -24,6 +38,8 @@ def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None): self.event_type = event_type # Port config dict self.port_dict = port_dict + self.db_name = db_name + self.table_name = table_name def __str__(self): return '{} - name={} index={} asic_id={}'.format('Add' if self.event_type == self.PORT_ADD else 'Remove', @@ -107,18 +123,17 @@ def subscribe_port_config_change(namespaces): sel.addSelectable(port_tbl) return sel, asic_context -def subscribe_port_update_event(namespaces, logger): +def subscribe_port_update_event(namespaces, logger, port_tbl_map=DEFAULT_PORT_TBL_MAP): """ Subscribe to a particular DB's table and listen to only interested fields Format : { : , , , .. } where only field update will be received """ - port_tbl_map = [ - {'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME}, - {'STATE_DB': 'TRANSCEIVER_INFO'}, - {'STATE_DB': 'PORT_TABLE', 'FILTER': ['host_tx_ready']}, - ] - + # PORT_EVENT dict stores the latest port change event for each key, to avoid + # duplicate event processing. key in PORT_EVENT dict would be a combination + # of (key of redis DB entry, port_tbl.db_name, port_tbl.table_name) + PortChangeEvent.thread_local_data.PORT_EVENT = {} + sel = swsscommon.Select() asic_context = {} for d in port_tbl_map: @@ -145,14 +160,18 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_ """ Select PORT update events, notify the observers upon a port update in CONFIG_DB or a XCVR insertion/removal in STATE_DB + + Returns: + bool: True if there's at least one update event; False if there's no update event. """ + has_event = False if not stop_event.is_set(): (state, _) = sel.select(SELECT_TIMEOUT_MSECS) if state == swsscommon.Select.TIMEOUT: - return + return has_event if state != swsscommon.Select.OBJECT: logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') - return + return has_event port_event_cache = {} for port_tbl in asic_context.keys(): @@ -173,10 +192,13 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_ fvp['op'] = op fvp['FILTER'] = port_tbl.filter # Soak duplicate events and consider only the last event - port_event_cache[key+port_tbl.db_name+port_tbl.table_name] = fvp + port_event_cache[(key, port_tbl.db_name, port_tbl.table_name)] = fvp # Now apply filter over soaked events for key, fvp in port_event_cache.items(): + port_name = key[0] + db_name = key[1] + table_name = key[2] port_index = int(fvp['index']) port_change_event = None diff = {} @@ -184,32 +206,38 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_ del fvp['FILTER'] apply_filter_to_fvp(filter, fvp) - if key in PortChangeEvent.PORT_EVENT: - diff = dict(set(fvp.items()) - set(PortChangeEvent.PORT_EVENT[key].items())) + if key in PortChangeEvent.thread_local_data.PORT_EVENT: + diff = dict(set(fvp.items()) - set(PortChangeEvent.thread_local_data.PORT_EVENT[key].items())) # Ignore duplicate events if not diff: - PortChangeEvent.PORT_EVENT[key] = fvp + PortChangeEvent.thread_local_data.PORT_EVENT[key] = fvp continue - PortChangeEvent.PORT_EVENT[key] = fvp + PortChangeEvent.thread_local_data.PORT_EVENT[key] = fvp if fvp['op'] == swsscommon.SET_COMMAND: port_change_event = PortChangeEvent(fvp['key'], port_index, fvp['asic_id'], PortChangeEvent.PORT_SET, - fvp) + fvp, + db_name, + table_name) elif fvp['op'] == swsscommon.DEL_COMMAND: port_change_event = PortChangeEvent(fvp['key'], port_index, fvp['asic_id'], PortChangeEvent.PORT_DEL, - fvp) + fvp, + db_name, + table_name) # This is the final event considered for processing logger.log_warning("*** {} handle_port_update_event() fvp {}".format( key, fvp)) if port_change_event is not None: - port_change_event_handler(port_change_event) + has_event = True + port_change_event_handler(port_change_event) + return has_event def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler): """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py new file mode 100644 index 000000000..37d51f707 --- /dev/null +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py @@ -0,0 +1,61 @@ +try: + from sonic_py_common import daemon_base + from sonic_py_common import multi_asic + from swsscommon import swsscommon +except ImportError as e: + raise ImportError(str(e) + " - required module not found") + +TRANSCEIVER_INFO_TABLE = 'TRANSCEIVER_INFO' +TRANSCEIVER_DOM_SENSOR_TABLE = 'TRANSCEIVER_DOM_SENSOR' +TRANSCEIVER_DOM_THRESHOLD_TABLE = 'TRANSCEIVER_DOM_THRESHOLD' +TRANSCEIVER_STATUS_TABLE = 'TRANSCEIVER_STATUS' +TRANSCEIVER_PM_TABLE = 'TRANSCEIVER_PM' +TRANSCEIVER_PM_WINDOW_STATS_TABLE = 'TRANSCEIVER_PM_WINDOW_STATS_TABLE' + +class XcvrTableHelper: + def __init__(self, namespaces): + self.int_tbl, self.dom_tbl, self.dom_threshold_tbl, self.status_tbl, self.app_port_tbl, \ + self.cfg_port_tbl, self.state_port_tbl, self.pm_tbl,self.pm_window_stats_tbl = {}, {}, {}, {}, {}, {}, {}, {}, {} + self.state_db = {} + self.cfg_db = {} + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + self.state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + self.int_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_INFO_TABLE) + self.dom_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) + self.dom_threshold_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_DOM_THRESHOLD_TABLE) + self.status_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_STATUS_TABLE) + self.pm_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_PM_TABLE) + self.pm_window_stats_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_PM_WINDOW_STATS_TABLE) + self.state_port_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], swsscommon.STATE_PORT_TABLE_NAME) + appl_db = daemon_base.db_connect("APPL_DB", namespace) + self.app_port_tbl[asic_id] = swsscommon.ProducerStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) + self.cfg_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) + self.cfg_port_tbl[asic_id] = swsscommon.Table(self.cfg_db[asic_id], swsscommon.CFG_PORT_TABLE_NAME) + + def get_intf_tbl(self, asic_id): + return self.int_tbl[asic_id] + + def get_dom_tbl(self, asic_id): + return self.dom_tbl[asic_id] + + def get_dom_threshold_tbl(self, asic_id): + return self.dom_threshold_tbl[asic_id] + + def get_status_tbl(self, asic_id): + return self.status_tbl[asic_id] + + def get_app_port_tbl(self, asic_id): + return self.app_port_tbl[asic_id] + + def get_state_db(self, asic_id): + return self.state_db[asic_id] + + def get_cfg_port_tbl(self, asic_id): + return self.cfg_port_tbl[asic_id] + + def get_state_port_tbl(self, asic_id): + return self.state_port_tbl[asic_id] + + def get_pm_window_stats_tbl(self, asic_id): + return self.pm_window_stats_tbl[asic_id] From 770ab71b078276f96e7b5177fdce0aadf76cb8f9 Mon Sep 17 00:00:00 2001 From: Jaganathan Anbalagan Date: Fri, 21 Jul 2023 19:40:28 -0700 Subject: [PATCH 2/7] typo --- sonic-xcvrd/xcvrd/xcvrd.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 0c5d4fcb9..34e6a2644 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -519,7 +519,7 @@ def post_port_dom_info_to_db(logical_port_name, port_mapping, table, stop_event= # Delete port dom/sfp info from db -def del_port_sfp_dom_info_from_db(logical_port_name, port_mapping, int_tbl, dom_tbl, dom_threshold_tbl, pm_tbl): +def del_port_sfp_dom_info_from_db(logical_port_name, port_mapping, int_tbl, dom_tbl, dom_threshold_tbl): for physical_port_name in get_physical_port_name_dict(logical_port_name, port_mapping).values(): try: if int_tbl: @@ -1776,8 +1776,7 @@ def on_remove_logical_port(self, port_change_event): self.port_mapping, None, self.xcvr_table_helper.get_dom_tbl(port_change_event.asic_id), - None, - self.xcvr_table_helper.get_pm_tbl(port_change_event.asic_id)) + None) delete_port_from_status_table_hw(port_change_event.port_name, self.port_mapping, self.xcvr_table_helper.get_status_tbl(port_change_event.asic_id)) @@ -2090,8 +2089,7 @@ def task_worker(self, stopping_event, sfp_error_event): del_port_sfp_dom_info_from_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_intf_tbl(asic_index), self.xcvr_table_helper.get_dom_tbl(asic_index), - self.xcvr_table_helper.get_dom_threshold_tbl(asic_index), - self.xcvr_table_helper.get_pm_tbl(asic_index)) + self.xcvr_table_helper.get_dom_threshold_tbl(asic_index)) delete_port_from_status_table_hw(logical_port, self.port_mapping, self.xcvr_table_helper.get_status_tbl(asic_index)) else: try: @@ -2118,8 +2116,7 @@ def task_worker(self, stopping_event, sfp_error_event): self.port_mapping, None, self.xcvr_table_helper.get_dom_tbl(asic_index), - self.xcvr_table_helper.get_dom_threshold_tbl(asic_index), - self.xcvr_table_helper.get_pm_tbl(asic_index)) + self.xcvr_table_helper.get_dom_threshold_tbl(asic_index)) delete_port_from_status_table_hw(logical_port, self.port_mapping, self.xcvr_table_helper.get_status_tbl(asic_index)) except (TypeError, ValueError) as e: helper_logger.log_error("{}: Got unrecognized event {}, ignored".format(logical_port, value)) @@ -2215,8 +2212,7 @@ def on_remove_logical_port(self, port_change_event): self.port_mapping, self.xcvr_table_helper.get_intf_tbl(port_change_event.asic_id), self.xcvr_table_helper.get_dom_tbl(port_change_event.asic_id), - self.xcvr_table_helper.get_dom_threshold_tbl(port_change_event.asic_id), - self.xcvr_table_helper.get_pm_tbl(port_change_event.asic_id)) + self.xcvr_table_helper.get_dom_threshold_tbl(port_change_event.asic_id)) delete_port_from_status_table_sw(port_change_event.port_name, self.xcvr_table_helper.get_status_tbl(port_change_event.asic_id)) delete_port_from_status_table_hw(port_change_event.port_name, self.port_mapping, @@ -2444,8 +2440,7 @@ def deinit(self): del_port_sfp_dom_info_from_db(logical_port_name, port_mapping_data, self.xcvr_table_helper.get_intf_tbl(asic_index), self.xcvr_table_helper.get_dom_tbl(asic_index), - self.xcvr_table_helper.get_dom_threshold_tbl(asic_index), - self.xcvr_table_helper.get_pm_tbl(asic_index)) + self.xcvr_table_helper.get_dom_threshold_tbl(asic_index)) delete_port_from_status_table_sw(logical_port_name, self.xcvr_table_helper.get_status_tbl(asic_index)) delete_port_from_status_table_hw(logical_port_name, port_mapping_data, self.xcvr_table_helper.get_status_tbl(asic_index)) From 5af327d8befa14a39a8d12317e702d5a0d5fa0c1 Mon Sep 17 00:00:00 2001 From: Jaganathan Anbalagan Date: Fri, 28 Jul 2023 03:58:49 -0700 Subject: [PATCH 3/7] Signed-off-by: Jaganathan Anbalagan PM statistics is collected through scheduler --- sonic-xcvrd/xcvrd/pm_mgr.py | 109 +++++++++++++++++++----------------- 1 file changed, 58 insertions(+), 51 deletions(-) diff --git a/sonic-xcvrd/xcvrd/pm_mgr.py b/sonic-xcvrd/xcvrd/pm_mgr.py index f16870ce3..d8970f40b 100644 --- a/sonic-xcvrd/xcvrd/pm_mgr.py +++ b/sonic-xcvrd/xcvrd/pm_mgr.py @@ -11,6 +11,7 @@ import threading import time import traceback + import sched from swsscommon import swsscommon from .xcvrd_utilities import sfp_status_helper @@ -381,64 +382,70 @@ def beautify_pm_info_dict(self, pm_info_dict, physical_port): def task_worker(self): self.log_notice("Start Performance monitoring for 400G ZR modules") sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces, self, self.PORT_TBL_MAP) + #Schedule the PM + self.scheduler = sched.scheduler(time.time, time.sleep) + self.scheduler.enter(self.pm_interval, 1, self.pm_update_task_worker) + + #Run all scheduled events and handle port events + while not self.task_stopping_event.is_set(): + port_mapping.handle_port_update_event(sel, asic_context, self.task_stopping_event, self.helper_logger, self.on_port_update_event) + self.scheduler.run(blocking=False) + + def pm_update_task_worker(self): + + # Schedule the next run with pm_interval + self.scheduler.enter(self.pm_interval, 1, self.pm_update_task_worker) + #Iterate the ZR-400G port list for pm window update + for lport in self.pm_port_list: + asic_index = self.port_mapping_data.get_asic_id_for_logical_port(lport) + if asic_index is None: + continue + + physical_port = self.port_mapping_data.get_logical_to_physical(lport)[0] + if not physical_port: + continue - # Start loop to update PM stats info to DB periodically for PM interval time, default is 60sec - while not self.task_stopping_event.wait(self.pm_interval): - - # Handle port change event from main thread - port_mapping.handle_port_update_event(sel, asic_context, self.task_stopping_event, self, self.on_port_update_event) - for lport in self.pm_port_list: - asic_index = self.port_mapping_data.get_asic_id_for_logical_port(lport) - if asic_index is None: - continue - - physical_port = self.port_mapping_data.get_logical_to_physical(lport)[0] - if not physical_port: + #skip if sfp obj or sfp is not present + sfp = self.platform_chassis.get_sfp(physical_port) + if sfp is not None: + if not sfp.get_presence(): continue + else: + continue - #skip if sfp obj or sfp is not present - sfp = self.platform_chassis.get_sfp(physical_port) - if sfp is not None: - if not sfp.get_presence(): - continue - else: - continue + if self.get_port_admin_status(lport) != 'up': + continue - if self.get_port_admin_status(lport) != 'up': + if not sfp_status_helper.detect_port_in_error_status(lport, self.xcvr_table_helper.get_status_tbl(asic_index)): + try: + pm_hw_data = self.get_transceiver_pm(physical_port) + except (KeyError, TypeError) as e: + #continue to process next port since execption could be raised due to port reset, transceiver removal + self.log_warning("Got exception {} while reading pm stats for port {}, ignored".format(repr(e), lport)) continue - if not sfp_status_helper.detect_port_in_error_status(lport, self.xcvr_table_helper.get_status_tbl(asic_index)): - try: - pm_hw_data = self.get_transceiver_pm(physical_port) - except (KeyError, TypeError) as e: - #continue to process next port since execption could be raised due to port reset, transceiver removal - self.log_warning("Got exception {} while reading pm stats for port {}, ignored".format(repr(e), lport)) - continue - - if not pm_hw_data: - continue + if not pm_hw_data: + continue - self.beautify_pm_info_dict(pm_hw_data, physical_port) - - # Update 60Sec PM time window slots - start_window = WINDOW_60SEC_START_NUM - end_window = WINDOW_60SEC_END_NUM - pm_win_itrvl_in_secs = TIME_60SEC_IN_SECS - self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) - - # Update 15min PM time window slots - start_window = WINDOW_15MIN_START_NUM - end_window = WINDOW_15MIN_END_NUM - pm_win_itrvl_in_secs = TIME_15MIN_IN_SECS - self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) - - # Update 24hrs PM time window slots - start_window = WINDOW_24HRS_START_NUM - end_window = WINDOW_24HRS_END_NUM - pm_win_itrvl_in_secs = TIME_24HRS_IN_SECS - self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) - - self.log_notice("Stop Performance Monitoring") + self.beautify_pm_info_dict(pm_hw_data, physical_port) + + # Update 60Sec PM time window slots + start_window = WINDOW_60SEC_START_NUM + end_window = WINDOW_60SEC_END_NUM + pm_win_itrvl_in_secs = TIME_60SEC_IN_SECS + self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) + + # Update 15min PM time window slots + start_window = WINDOW_15MIN_START_NUM + end_window = WINDOW_15MIN_END_NUM + pm_win_itrvl_in_secs = TIME_15MIN_IN_SECS + self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) + + # Update 24hrs PM time window slots + start_window = WINDOW_24HRS_START_NUM + end_window = WINDOW_24HRS_END_NUM + pm_win_itrvl_in_secs = TIME_24HRS_IN_SECS + self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) def run(self): if self.task_stopping_event.is_set(): From 9ef1dbf5d45d0e103cb772a394288cb2b54a00ab Mon Sep 17 00:00:00 2001 From: Jaganathan Anbalagan Date: Fri, 18 Aug 2023 13:01:16 -0700 Subject: [PATCH 4/7] Signed-off-by: Jaganathan Anbalagan Made changes to control vdm freeze and unfreeze pm_mgr. --- sonic-xcvrd/xcvrd/pm_mgr.py | 26 ++++++++++++++++++- .../xcvrd_utilities/xcvr_table_helper.py | 4 +-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/sonic-xcvrd/xcvrd/pm_mgr.py b/sonic-xcvrd/xcvrd/pm_mgr.py index d8970f40b..3bdc140f7 100644 --- a/sonic-xcvrd/xcvrd/pm_mgr.py +++ b/sonic-xcvrd/xcvrd/pm_mgr.py @@ -81,6 +81,22 @@ def get_transceiver_pm(self, physical_port): except NotImplementedError: pass return {} + + def freeze_vdm_stats(self, physical_port): + if self.platform_chassis is not None: + try: + return self.platform_chassis.get_sfp(physical_port).freeze_vdm_stats() + except NotImplementedError: + pass + return {} + + def unfreeze_vdm_stats(self, physical_port): + if self.platform_chassis is not None: + try: + return self.platform_chassis.get_sfp(physical_port).unfreeze_vdm_stats() + except NotImplementedError: + pass + return {} def get_port_admin_status(self, lport): admin_status = 'down' @@ -121,7 +137,6 @@ def on_port_update_event(self, port_change_event): return physical_port = self.port_mapping_data.get_logical_to_physical(lport)[0] - self.log_notice("PM port event type:{} for lport: {} table name: {}".format(port_change_event.event_type, lport, port_change_event.table_name)) if port_change_event.event_type == port_change_event.PORT_SET: if (True if "ZR" not in str(port_change_event.port_dict.get('media_interface_code', None)) else False): return @@ -418,7 +433,16 @@ def pm_update_task_worker(self): if not sfp_status_helper.detect_port_in_error_status(lport, self.xcvr_table_helper.get_status_tbl(asic_index)): try: + if self.freeze_vdm_stats(physical_port): + self.log_error("vdm freeze failed for port {}".format(lport)) + continue + freeze_time = time.time() pm_hw_data = self.get_transceiver_pm(physical_port) + pm_hw_data['pm_win_end_time'] = freeze_time + + if self.unfreeze_vdm_stats(physical_port): + self.log_error("vdm unfreeze failed for port {}, pm data for the port is not current".format(lport)) + except (KeyError, TypeError) as e: #continue to process next port since execption could be raised due to port reset, transceiver removal self.log_warning("Got exception {} while reading pm stats for port {}, ignored".format(repr(e), lport)) diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py index 37d51f707..f13c1155f 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py @@ -9,13 +9,12 @@ TRANSCEIVER_DOM_SENSOR_TABLE = 'TRANSCEIVER_DOM_SENSOR' TRANSCEIVER_DOM_THRESHOLD_TABLE = 'TRANSCEIVER_DOM_THRESHOLD' TRANSCEIVER_STATUS_TABLE = 'TRANSCEIVER_STATUS' -TRANSCEIVER_PM_TABLE = 'TRANSCEIVER_PM' TRANSCEIVER_PM_WINDOW_STATS_TABLE = 'TRANSCEIVER_PM_WINDOW_STATS_TABLE' class XcvrTableHelper: def __init__(self, namespaces): self.int_tbl, self.dom_tbl, self.dom_threshold_tbl, self.status_tbl, self.app_port_tbl, \ - self.cfg_port_tbl, self.state_port_tbl, self.pm_tbl,self.pm_window_stats_tbl = {}, {}, {}, {}, {}, {}, {}, {}, {} + self.cfg_port_tbl, self.state_port_tbl, self.pm_window_stats_tbl = {}, {}, {}, {}, {}, {}, {}, {} self.state_db = {} self.cfg_db = {} for namespace in namespaces: @@ -25,7 +24,6 @@ def __init__(self, namespaces): self.dom_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) self.dom_threshold_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_DOM_THRESHOLD_TABLE) self.status_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_STATUS_TABLE) - self.pm_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_PM_TABLE) self.pm_window_stats_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_PM_WINDOW_STATS_TABLE) self.state_port_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], swsscommon.STATE_PORT_TABLE_NAME) appl_db = daemon_base.db_connect("APPL_DB", namespace) From 6aaf3047dd7d3bfd4bb8ac8aed0f4203f79114e6 Mon Sep 17 00:00:00 2001 From: Jaganathan Anbalagan Date: Fri, 18 Aug 2023 14:00:18 -0700 Subject: [PATCH 5/7] Signed-off-by: Jaganathan Anbalagan 60sec window will not be filled if the platform uses 120sec pm_interval, removed white space --- sonic-xcvrd/xcvrd/pm_mgr.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sonic-xcvrd/xcvrd/pm_mgr.py b/sonic-xcvrd/xcvrd/pm_mgr.py index 3bdc140f7..6634d6702 100644 --- a/sonic-xcvrd/xcvrd/pm_mgr.py +++ b/sonic-xcvrd/xcvrd/pm_mgr.py @@ -453,21 +453,22 @@ def pm_update_task_worker(self): self.beautify_pm_info_dict(pm_hw_data, physical_port) - # Update 60Sec PM time window slots - start_window = WINDOW_60SEC_START_NUM - end_window = WINDOW_60SEC_END_NUM - pm_win_itrvl_in_secs = TIME_60SEC_IN_SECS - self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) + if self.pm_interval <= PM_INFO_UPDATE_PERIOD_SECS: + # Update 60Sec PM time window slots + start_window = WINDOW_60SEC_START_NUM + end_window = WINDOW_60SEC_END_NUM + pm_win_itrvl_in_secs = TIME_60SEC_IN_SECS + self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) # Update 15min PM time window slots start_window = WINDOW_15MIN_START_NUM - end_window = WINDOW_15MIN_END_NUM + end_window = WINDOW_15MIN_END_NUM pm_win_itrvl_in_secs = TIME_15MIN_IN_SECS self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) # Update 24hrs PM time window slots start_window = WINDOW_24HRS_START_NUM - end_window = WINDOW_24HRS_END_NUM + end_window = WINDOW_24HRS_END_NUM pm_win_itrvl_in_secs = TIME_24HRS_IN_SECS self.pm_window_update_to_DB(lport, asic_index, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data) From a91e6fcf4e8307f3eaebe55f5f2aa00356e498f5 Mon Sep 17 00:00:00 2001 From: jaganbal-a <97986478+jaganbal-a@users.noreply.github.com> Date: Mon, 28 Aug 2023 13:59:43 -0400 Subject: [PATCH 6/7] Update xcvr_table_helper.py removed the _TABLE from the TRANSCEIVER_PM_WINDOW_STATS table name. --- sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py index f13c1155f..ded743378 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py @@ -9,7 +9,7 @@ TRANSCEIVER_DOM_SENSOR_TABLE = 'TRANSCEIVER_DOM_SENSOR' TRANSCEIVER_DOM_THRESHOLD_TABLE = 'TRANSCEIVER_DOM_THRESHOLD' TRANSCEIVER_STATUS_TABLE = 'TRANSCEIVER_STATUS' -TRANSCEIVER_PM_WINDOW_STATS_TABLE = 'TRANSCEIVER_PM_WINDOW_STATS_TABLE' +TRANSCEIVER_PM_WINDOW_STATS_TABLE = 'TRANSCEIVER_PM_WINDOW_STATS' class XcvrTableHelper: def __init__(self, namespaces): From 86e6172122f2a7ba84b3503ad18a7ba993a36189 Mon Sep 17 00:00:00 2001 From: Jaganathan Anbalagan Date: Thu, 7 Sep 2023 14:39:11 -0700 Subject: [PATCH 7/7] Signed-off-by: Jaganathan Anbalagan Corrected the function header. --- sonic-xcvrd/xcvrd/pm_mgr.py | 57 ++++++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/sonic-xcvrd/xcvrd/pm_mgr.py b/sonic-xcvrd/xcvrd/pm_mgr.py index 6634d6702..d4d084a84 100644 --- a/sonic-xcvrd/xcvrd/pm_mgr.py +++ b/sonic-xcvrd/xcvrd/pm_mgr.py @@ -173,16 +173,42 @@ def on_port_update_event(self, port_change_event): state_pm_win_stats_tbl._del(lport) self.log_notice("Port delete event:{} deleted from pm port list".format(port_change_event.port_name)) - # Update the PM statistics in string format to respective window key to - # TRANSCEIVER_PM_WINDOW_STATS_TABLE in state DB. def set_pm_stats_in_pm_win_stats_tbl(self, asic, lport, window, pm_data_str): + """ + + Update the PM statistics in string format to respective window number to + TRANSCEIVER_PM_WINDOW_STATS_TABLE in state DB. + Retrieve the PM statistics for a respective window number from + TRANSCEIVER_PM_WINDOW_STATS table. + + Args: + asic index: + Switch ASIC number. + logical port: + Front panel interface id. + window: + PM window number. + """ + state_pm_win_stats_tbl = self.xcvr_table_helper.get_pm_window_stats_tbl(asic) fvs = swsscommon.FieldValuePairs([(window, str(pm_data_str))]) state_pm_win_stats_tbl.set(lport, fvs) - # Retrieve the PM statistics from respective window key from. - # TRANSCEIVER_PM_WINDOW_STATS_TABLE - state DB. def get_pm_stats_in_pm_win_stats_tbl(self, asic, lport, window): + """ + + Retrieve the PM statistics for a respective window number from + TRANSCEIVER_PM_WINDOW_STATS table. + + Args: + asic index: + Switch ASIC number. + logical port: + Front panel interface id. + window: + PM window number. + """ + state_pm_win_stats_tbl = self.xcvr_table_helper.get_pm_window_stats_tbl(asic) status, fvs = state_pm_win_stats_tbl.get(str(lport)) stats_tbl_tuple = dict((k, v) for k, v in fvs) @@ -191,12 +217,24 @@ def get_pm_stats_in_pm_win_stats_tbl(self, asic, lport, window): else: return None - # - # input args: asic index, logical port, start and end window number of a PM window granularity, - # PM window granularity in secs, pm stats read from module. - # - # Algorithm to sample and update the PM stats to the appropriate PM window in TRANSCEIVER_PM_WINDOW_STATS_TABLE. def pm_window_update_to_DB(self, asic, lport, start_window, end_window, pm_win_itrvl_in_secs, pm_hw_data): + """ + + Algorithm to sample and update the PM stats to the appropriate PM window in TRANSCEIVER_PM_WINDOW_STATS table. + + Args: + asic index: + Switch ASIC number. + logical port: + Front panel interface id. + start window: + start PM window number of the PM window size. + end window number: + end PM window number of the PM window size. + pm_hw_data: + PM info collected from transciever module. + """ + window_num = start_window while window_num < (end_window + 1): #Retrieve PM data from DB @@ -322,7 +360,6 @@ def pm_window_update_to_DB(self, asic, lport, start_window, end_window, pm_win_i def average_of_two_val(self, val1, val2): return((val1+val2)/2) - # perform min, max and average of relevant PM stats parameter between two dictionary def pm_data_sampling(self, pm_data_dict1, pm_data_dict2): sampled_pm_dict = {} try: