Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[action] [PR:510] Add/modify VDM and Status related cmis fields for onboarding xcvr diagnostic features (#510) #511

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
logger.addHandler(logging.NullHandler())

CMIS_VDM_KEY_TO_DB_PREFIX_KEY_MAP = {
"Laser Temperature [C]" : "laser_temperature_media",
"eSNR Media Input [dB]" : "esnr_media_input",
"PAM4 Level Transition Parameter Media Input [dB]" : "pam4_level_transition_media_input",
"Pre-FEC BER Minimum Media Input" : "prefec_ber_min_media_input",
Expand Down Expand Up @@ -2042,10 +2043,18 @@ def get_transceiver_status(self):
if rx_lol:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxcdrlol%d' % lane] = rx_lol[lane - 1]
tx_adaptive_eq_fail_flag_val = self.get_tx_adaptive_eq_fail_flag()
if tx_adaptive_eq_fail_flag_val:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['tx_eq_fault%d' % lane] = tx_adaptive_eq_fail_flag_val[lane - 1]
config_status_dict = self.get_config_datapath_hostlane_status()
if config_status_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['config_state_hostlane%d' % lane] = config_status_dict.get('ConfigStatusLane%d' % lane)
dedeint_hostlane = self.get_datapath_deinit()
if dedeint_hostlane is not None:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['dpdeinit_hostlane%d' % lane] = dedeint_hostlane[lane - 1]
dpinit_pending_dict = self.get_dpinit_pending()
if dpinit_pending_dict:
for lane in range(1, self.NUM_CHANNELS+1):
Expand Down Expand Up @@ -2208,6 +2217,12 @@ def set_datapath_deinit(self, channel):
data &= ~(1 << lane)
self.xcvr_eeprom.write(consts.DATAPATH_DEINIT_FIELD, data)

def get_datapath_deinit(self):
datapath_deinit = self.xcvr_eeprom.read(consts.DATAPATH_DEINIT_FIELD)
if datapath_deinit is None:
return None
return [bool(datapath_deinit & (1 << lane)) for lane in range(self.NUM_CHANNELS)]

def get_application_advertisement(self):
"""
Get the application advertisement of the CMIS transceiver
Expand Down Expand Up @@ -2393,6 +2408,30 @@ def get_tx_input_eq_max_val(self):
return None
return tx_input_max_val

def get_tx_adaptive_eq_fail_flag_supported(self):
"""
Returns whether the TX Adaptive Input EQ Fail Flag field is supported.
"""
return not self.is_flat_memory() and self.xcvr_eeprom.read(consts.TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG_SUPPORTED)

def get_tx_adaptive_eq_fail_flag(self):
"""
Returns the TX Adaptive Input EQ Fail Flag field on all lanes.
"""
tx_adaptive_eq_fail_flag_supported = self.get_tx_adaptive_eq_fail_flag_supported()
if tx_adaptive_eq_fail_flag_supported is None:
return None
if not tx_adaptive_eq_fail_flag_supported:
return ["N/A" for _ in range(self.NUM_CHANNELS)]
tx_adaptive_eq_fail_flag_val = self.xcvr_eeprom.read(consts.TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG)
if tx_adaptive_eq_fail_flag_val is None:
return None
keys = sorted(tx_adaptive_eq_fail_flag_val.keys())
tx_adaptive_eq_fail_flag_val_final = []
for key in keys:
tx_adaptive_eq_fail_flag_val_final.append(bool(tx_adaptive_eq_fail_flag_val[key]))
return tx_adaptive_eq_fail_flag_val_final

def get_tx_cdr_supported(self):
'''
This function returns the supported TX CDR field
Expand Down
2 changes: 2 additions & 0 deletions sonic_platform_base/sonic_xcvr/fields/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@
TX_LOS_SUPPORT_FIELD = "TxLOSSupported"
TX_CDR_LOL = "TxCDRLOL"
TX_CDR_LOL_SUPPORT_FIELD = "TxCDRLOLSupported"
TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG_SUPPORTED = "TxAdaptiveInputEqFailFlagSupported"
TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG = "TxAdaptiveInputEqFailFlag"
TX_POWER_HIGH_ALARM_FLAG = "TxPowerHighAlarmFlag"
TX_POWER_LOW_ALARM_FLAG = "TxPowerLowAlarmFlag"
TX_POWER_HIGH_WARN_FLAG = "TxPowerHighWarnFlag"
Expand Down
7 changes: 7 additions & 0 deletions sonic_platform_base/sonic_xcvr/mem_maps/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def __init__(self, codes):
RegBitField(consts.TX_FAULT_SUPPORT_FIELD, 0),
RegBitField(consts.TX_LOS_SUPPORT_FIELD, 1),
RegBitField(consts.TX_CDR_LOL_SUPPORT_FIELD, 2),
RegBitField(consts.TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG_SUPPORTED, 3),
),
NumberRegField(consts.RX_FLAGS_ADVT_FIELD, self.getaddr(0x1, 158),
RegBitField(consts.RX_LOS_SUPPORT, 1),
Expand Down Expand Up @@ -330,6 +331,12 @@ def __init__(self, codes):
)
for lane in range(1, 9))
),
RegGroupField(consts.TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG,
*(NumberRegField("%s%d" % (consts.TX_ADAPTIVE_INPUT_EQ_FAIL_FLAG, lane), self.getaddr(0x11, 138),
RegBitField("Bit%d" % (lane-1), (lane-1))
)
for lane in range(1, 9))
),
RegGroupField(consts.TX_POWER_HIGH_ALARM_FLAG,
*(NumberRegField("%s%d" % (consts.TX_POWER_HIGH_ALARM_FLAG, lane), self.getaddr(0x11, 139),
RegBitField("Bit%d" % (lane-1), (lane-1))
Expand Down
70 changes: 66 additions & 4 deletions tests/sonic_xcvr/test_cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1826,7 +1826,9 @@ def test_get_transceiver_threshold_info(self, mock_response, expected):
'Pre-FEC BER Average Media Input':{1:[0.001, 0.0125, 0, 0.01, 0, False, False, False, False]},
'Errored Frames Average Media Input':{1:[0, 1, 0, 1, 0, False, False, False, False]},
},
0, [False, False, False, False, False, False, False, False]
0, [False, False, False, False, False, False, False, False],
[False, True, False, False, False, False, False, False],
[False, False, False, False, False, False, False, False]
],
{
'module_state': 'ModuleReady',
Expand Down Expand Up @@ -1907,6 +1909,14 @@ def test_get_transceiver_threshold_info(self, mock_response, expected):
'rxcdrlol6': False,
'rxcdrlol7': False,
'rxcdrlol8': False,
'tx_eq_fault1': False,
'tx_eq_fault2': False,
'tx_eq_fault3': False,
'tx_eq_fault4': False,
'tx_eq_fault5': False,
'tx_eq_fault6': False,
'tx_eq_fault7': False,
'tx_eq_fault8': False,
'config_state_hostlane1': 'ConfigSuccess',
'config_state_hostlane2': 'ConfigSuccess',
'config_state_hostlane3': 'ConfigSuccess',
Expand All @@ -1915,6 +1925,14 @@ def test_get_transceiver_threshold_info(self, mock_response, expected):
'config_state_hostlane6': 'ConfigSuccess',
'config_state_hostlane7': 'ConfigSuccess',
'config_state_hostlane8': 'ConfigSuccess',
'dpdeinit_hostlane1' : False,
'dpdeinit_hostlane2' : True,
'dpdeinit_hostlane3' : False,
'dpdeinit_hostlane4' : False,
'dpdeinit_hostlane5' : False,
'dpdeinit_hostlane6' : False,
'dpdeinit_hostlane7' : False,
'dpdeinit_hostlane8' : False,
'dpinit_pending_hostlane1': False,
'dpinit_pending_hostlane2': False,
'dpinit_pending_hostlane3': False,
Expand Down Expand Up @@ -2070,7 +2088,7 @@ def test_get_transceiver_threshold_info(self, mock_response, expected):
'Pre-FEC BER Average Media Input':{1:[0.001, 0.0125, 0, 0.01, 0, False, False, False, False]},
'Errored Frames Average Media Input':{1:[0, 1, 0, 1, 0, False, False, False, False]},
},
None, None
None, None, None, None
],
{
'module_state': 'ModuleReady',
Expand Down Expand Up @@ -2133,8 +2151,13 @@ def test_get_transceiver_status(self, mock_response, expected):
self.api.get_tx_disable_channel.return_value = mock_response[20]
self.api.get_tx_disable = MagicMock()
self.api.get_tx_disable.return_value = mock_response[21]
result = self.api.get_transceiver_status()
assert result == expected
with patch.object(self.api, 'get_datapath_deinit', return_value=mock_response[22]), \
patch.object(self.api, 'get_tx_adaptive_eq_fail_flag', return_value=mock_response[23]):
self.api.vdm = MagicMock()
self.api.vdm.return_value.VDM_FLAG = 'mocked_value'

result = self.api.get_transceiver_status()
assert result == expected

@pytest.mark.parametrize("mock_response, expected",[
(
Expand Down Expand Up @@ -2309,6 +2332,18 @@ def test_set_datapath_deinit(self):
assert kall is not None
assert kall[0] == (consts.DATAPATH_DEINIT_FIELD, 0xff)

@pytest.mark.parametrize("mock_response, expected", [
(None, None),
(0x00, [False for _ in range(8)]),
(0xff, [True for _ in range(8)]),
(0x0f, [True, True, True, True, False, False, False, False]),
])
def test_get_datapath_deinit(self, mock_response, expected):
self.api.xcvr_eeprom = MagicMock()
self.api.xcvr_eeprom.read.return_value = mock_response

assert self.api.get_datapath_deinit() == expected

def test_get_application_advertisement(self):
self.api.xcvr_eeprom.read = MagicMock()
self.api.xcvr_eeprom.read.side_effect = [
Expand Down Expand Up @@ -2666,3 +2701,30 @@ def test_get_transceiver_info_firmware_versions(self):
self.api.get_module_fw_info.side_effect = [{'result': ( '', '', '', '', '', '', '', '','2.0.0', '1.0.0')}]
result = self.api.get_transceiver_info_firmware_versions()
assert result == expected_result


@pytest.mark.parametrize("mock_flat_memory, mock_response, expected", [
(False, True, True),
(False, False, False),
(False, None, None),
(True, True, False),
])
def test_get_tx_adaptive_eq_fail_flag_supported(self, mock_flat_memory, mock_response, expected):
with patch.object(self.api, 'is_flat_memory', return_value=mock_flat_memory):
self.api.xcvr_eeprom.read = MagicMock(return_value=mock_response)
result = self.api.get_tx_adaptive_eq_fail_flag_supported()
assert result == expected

@pytest.mark.parametrize("mock_response, expected", [
([True, {'TxAdaptiveEqFailFlag1': 1, 'TxAdaptiveEqFailFlag2': 0}], [True, False]),
([False, {'TxAdaptiveEqFailFlag1': 1, 'TxAdaptiveEqFailFlag2': 0}], ['N/A' for _ in range(8)]),
([None, None], None),
([True, None], None)
])
def test_get_tx_adaptive_eq_fail_flag(self, mock_response, expected):
self.api.get_tx_adaptive_eq_fail_flag_supported = MagicMock()
self.api.get_tx_adaptive_eq_fail_flag_supported.return_value = mock_response[0]
self.api.xcvr_eeprom.read = MagicMock()
self.api.xcvr_eeprom.read.return_value = mock_response[1]
result = self.api.get_tx_adaptive_eq_fail_flag()
assert result == expected
Loading