diff --git a/sonic_platform_base/sonic_xcvr/api/public/cmis.py b/sonic_platform_base/sonic_xcvr/api/public/cmis.py index 9f312a900..4a2c161a9 100644 --- a/sonic_platform_base/sonic_xcvr/api/public/cmis.py +++ b/sonic_platform_base/sonic_xcvr/api/public/cmis.py @@ -22,6 +22,7 @@ logger.addHandler(logging.NullHandler()) CMIS_VDM_KEY_TO_DB_PREFIX_KEY_MAP = { + "Laser Temperature [C]" : "laser_temperature_media", "eSNR Media Input [dB]" : "esnr_media_input", "PAM4 Level Transition Parameter Media Input [dB]" : "pam4_level_transition_media_input", "Pre-FEC BER Minimum Media Input" : "prefec_ber_min_media_input", @@ -2042,10 +2043,18 @@ def get_transceiver_status(self): if rx_lol: for lane in range(1, self.NUM_CHANNELS+1): trans_status['rxcdrlol%d' % lane] = rx_lol[lane - 1] + tx_adaptive_eq_fail_flag_val = self.get_tx_adaptive_eq_fail_flag() + if tx_adaptive_eq_fail_flag_val: + for lane in range(1, self.NUM_CHANNELS+1): + trans_status['tx_eq_fault%d' % lane] = tx_adaptive_eq_fail_flag_val[lane - 1] config_status_dict = self.get_config_datapath_hostlane_status() if config_status_dict: for lane in range(1, self.NUM_CHANNELS+1): trans_status['config_state_hostlane%d' % lane] = config_status_dict.get('ConfigStatusLane%d' % lane) + dedeint_hostlane = self.get_datapath_deinit() + if dedeint_hostlane is not None: + for lane in range(1, self.NUM_CHANNELS+1): + trans_status['dpdeinit_hostlane%d' % lane] = dedeint_hostlane[lane - 1] dpinit_pending_dict = self.get_dpinit_pending() if dpinit_pending_dict: for lane in range(1, self.NUM_CHANNELS+1): @@ -2208,6 +2217,12 @@ def set_datapath_deinit(self, channel): data &= ~(1 << lane) self.xcvr_eeprom.write(consts.DATAPATH_DEINIT_FIELD, data) + def get_datapath_deinit(self): + datapath_deinit = self.xcvr_eeprom.read(consts.DATAPATH_DEINIT_FIELD) + if datapath_deinit is None: + return None + return [bool(datapath_deinit & (1 << lane)) for lane in range(self.NUM_CHANNELS)] + def get_application_advertisement(self): """ Get the application advertisement of the CMIS transceiver @@ -2393,6 +2408,30 @@ def get_tx_input_eq_max_val(self): return None return tx_input_max_val + def get_tx_adaptive_eq_fail_flag_supported(self): + """ + Returns whether the TX Adaptive Input EQ Fail Flag field is supported. + """ + return not self.is_flat_memory() and self.xcvr_eeprom.read(consts.TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG_SUPPORTED) + + def get_tx_adaptive_eq_fail_flag(self): + """ + Returns the TX Adaptive Input EQ Fail Flag field on all lanes. + """ + tx_adaptive_eq_fail_flag_supported = self.get_tx_adaptive_eq_fail_flag_supported() + if tx_adaptive_eq_fail_flag_supported is None: + return None + if not tx_adaptive_eq_fail_flag_supported: + return ["N/A" for _ in range(self.NUM_CHANNELS)] + tx_adaptive_eq_fail_flag_val = self.xcvr_eeprom.read(consts.TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG) + if tx_adaptive_eq_fail_flag_val is None: + return None + keys = sorted(tx_adaptive_eq_fail_flag_val.keys()) + tx_adaptive_eq_fail_flag_val_final = [] + for key in keys: + tx_adaptive_eq_fail_flag_val_final.append(bool(tx_adaptive_eq_fail_flag_val[key])) + return tx_adaptive_eq_fail_flag_val_final + def get_tx_cdr_supported(self): ''' This function returns the supported TX CDR field diff --git a/sonic_platform_base/sonic_xcvr/fields/consts.py b/sonic_platform_base/sonic_xcvr/fields/consts.py index 030ea57ed..cca473b9a 100644 --- a/sonic_platform_base/sonic_xcvr/fields/consts.py +++ b/sonic_platform_base/sonic_xcvr/fields/consts.py @@ -251,6 +251,8 @@ TX_LOS_SUPPORT_FIELD = "TxLOSSupported" TX_CDR_LOL = "TxCDRLOL" TX_CDR_LOL_SUPPORT_FIELD = "TxCDRLOLSupported" +TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG_SUPPORTED = "TxAdaptiveInputEqFailFlagSupported" +TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG = "TxAdaptiveInputEqFailFlag" TX_POWER_HIGH_ALARM_FLAG = "TxPowerHighAlarmFlag" TX_POWER_LOW_ALARM_FLAG = "TxPowerLowAlarmFlag" TX_POWER_HIGH_WARN_FLAG = "TxPowerHighWarnFlag" diff --git a/sonic_platform_base/sonic_xcvr/mem_maps/public/cmis.py b/sonic_platform_base/sonic_xcvr/mem_maps/public/cmis.py index 8702e76f3..a08a90efa 100644 --- a/sonic_platform_base/sonic_xcvr/mem_maps/public/cmis.py +++ b/sonic_platform_base/sonic_xcvr/mem_maps/public/cmis.py @@ -219,6 +219,7 @@ def __init__(self, codes): RegBitField(consts.TX_FAULT_SUPPORT_FIELD, 0), RegBitField(consts.TX_LOS_SUPPORT_FIELD, 1), RegBitField(consts.TX_CDR_LOL_SUPPORT_FIELD, 2), + RegBitField(consts.TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG_SUPPORTED, 3), ), NumberRegField(consts.RX_FLAGS_ADVT_FIELD, self.getaddr(0x1, 158), RegBitField(consts.RX_LOS_SUPPORT, 1), @@ -330,6 +331,12 @@ def __init__(self, codes): ) for lane in range(1, 9)) ), + RegGroupField(consts.TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG, + *(NumberRegField("%s%d" % (consts.TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG, lane), self.getaddr(0x11, 138), + RegBitField("Bit%d" % (lane-1), (lane-1)) + ) + for lane in range(1, 9)) + ), RegGroupField(consts.TX_POWER_HIGH_ALARM_FLAG, *(NumberRegField("%s%d" % (consts.TX_POWER_HIGH_ALARM_FLAG, lane), self.getaddr(0x11, 139), RegBitField("Bit%d" % (lane-1), (lane-1)) diff --git a/tests/sonic_xcvr/test_cmis.py b/tests/sonic_xcvr/test_cmis.py index a5054f1aa..20a96b68b 100644 --- a/tests/sonic_xcvr/test_cmis.py +++ b/tests/sonic_xcvr/test_cmis.py @@ -1826,7 +1826,9 @@ def test_get_transceiver_threshold_info(self, mock_response, expected): 'Pre-FEC BER Average Media Input':{1:[0.001, 0.0125, 0, 0.01, 0, False, False, False, False]}, 'Errored Frames Average Media Input':{1:[0, 1, 0, 1, 0, False, False, False, False]}, }, - 0, [False, False, False, False, False, False, False, False] + 0, [False, False, False, False, False, False, False, False], + [False, True, False, False, False, False, False, False], + [False, False, False, False, False, False, False, False] ], { 'module_state': 'ModuleReady', @@ -1907,6 +1909,14 @@ def test_get_transceiver_threshold_info(self, mock_response, expected): 'rxcdrlol6': False, 'rxcdrlol7': False, 'rxcdrlol8': False, + 'tx_eq_fault1': False, + 'tx_eq_fault2': False, + 'tx_eq_fault3': False, + 'tx_eq_fault4': False, + 'tx_eq_fault5': False, + 'tx_eq_fault6': False, + 'tx_eq_fault7': False, + 'tx_eq_fault8': False, 'config_state_hostlane1': 'ConfigSuccess', 'config_state_hostlane2': 'ConfigSuccess', 'config_state_hostlane3': 'ConfigSuccess', @@ -1915,6 +1925,14 @@ def test_get_transceiver_threshold_info(self, mock_response, expected): 'config_state_hostlane6': 'ConfigSuccess', 'config_state_hostlane7': 'ConfigSuccess', 'config_state_hostlane8': 'ConfigSuccess', + 'dpdeinit_hostlane1' : False, + 'dpdeinit_hostlane2' : True, + 'dpdeinit_hostlane3' : False, + 'dpdeinit_hostlane4' : False, + 'dpdeinit_hostlane5' : False, + 'dpdeinit_hostlane6' : False, + 'dpdeinit_hostlane7' : False, + 'dpdeinit_hostlane8' : False, 'dpinit_pending_hostlane1': False, 'dpinit_pending_hostlane2': False, 'dpinit_pending_hostlane3': False, @@ -2070,7 +2088,7 @@ def test_get_transceiver_threshold_info(self, mock_response, expected): 'Pre-FEC BER Average Media Input':{1:[0.001, 0.0125, 0, 0.01, 0, False, False, False, False]}, 'Errored Frames Average Media Input':{1:[0, 1, 0, 1, 0, False, False, False, False]}, }, - None, None + None, None, None, None ], { 'module_state': 'ModuleReady', @@ -2133,8 +2151,13 @@ def test_get_transceiver_status(self, mock_response, expected): self.api.get_tx_disable_channel.return_value = mock_response[20] self.api.get_tx_disable = MagicMock() self.api.get_tx_disable.return_value = mock_response[21] - result = self.api.get_transceiver_status() - assert result == expected + with patch.object(self.api, 'get_datapath_deinit', return_value=mock_response[22]), \ + patch.object(self.api, 'get_tx_adaptive_eq_fail_flag', return_value=mock_response[23]): + self.api.vdm = MagicMock() + self.api.vdm.return_value.VDM_FLAG = 'mocked_value' + + result = self.api.get_transceiver_status() + assert result == expected @pytest.mark.parametrize("mock_response, expected",[ ( @@ -2309,6 +2332,18 @@ def test_set_datapath_deinit(self): assert kall is not None assert kall[0] == (consts.DATAPATH_DEINIT_FIELD, 0xff) + @pytest.mark.parametrize("mock_response, expected", [ + (None, None), + (0x00, [False for _ in range(8)]), + (0xff, [True for _ in range(8)]), + (0x0f, [True, True, True, True, False, False, False, False]), + ]) + def test_get_datapath_deinit(self, mock_response, expected): + self.api.xcvr_eeprom = MagicMock() + self.api.xcvr_eeprom.read.return_value = mock_response + + assert self.api.get_datapath_deinit() == expected + def test_get_application_advertisement(self): self.api.xcvr_eeprom.read = MagicMock() self.api.xcvr_eeprom.read.side_effect = [ @@ -2666,3 +2701,30 @@ def test_get_transceiver_info_firmware_versions(self): self.api.get_module_fw_info.side_effect = [{'result': ( '', '', '', '', '', '', '', '','2.0.0', '1.0.0')}] result = self.api.get_transceiver_info_firmware_versions() assert result == expected_result + + + @pytest.mark.parametrize("mock_flat_memory, mock_response, expected", [ + (False, True, True), + (False, False, False), + (False, None, None), + (True, True, False), + ]) + def test_get_tx_adaptive_eq_fail_flag_supported(self, mock_flat_memory, mock_response, expected): + with patch.object(self.api, 'is_flat_memory', return_value=mock_flat_memory): + self.api.xcvr_eeprom.read = MagicMock(return_value=mock_response) + result = self.api.get_tx_adaptive_eq_fail_flag_supported() + assert result == expected + + @pytest.mark.parametrize("mock_response, expected", [ + ([True, {'TxAdaptiveEqFailFlag1': 1, 'TxAdaptiveEqFailFlag2': 0}], [True, False]), + ([False, {'TxAdaptiveEqFailFlag1': 1, 'TxAdaptiveEqFailFlag2': 0}], ['N/A' for _ in range(8)]), + ([None, None], None), + ([True, None], None) + ]) + def test_get_tx_adaptive_eq_fail_flag(self, mock_response, expected): + self.api.get_tx_adaptive_eq_fail_flag_supported = MagicMock() + self.api.get_tx_adaptive_eq_fail_flag_supported.return_value = mock_response[0] + self.api.xcvr_eeprom.read = MagicMock() + self.api.xcvr_eeprom.read.return_value = mock_response[1] + result = self.api.get_tx_adaptive_eq_fail_flag() + assert result == expected \ No newline at end of file