diff --git a/sonic-psud/scripts/psud b/sonic-psud/scripts/psud index ce588589c..3bcae4c76 100644 --- a/sonic-psud/scripts/psud +++ b/sonic-psud/scripts/psud @@ -125,15 +125,6 @@ def get_psu_key(psu_index): return PSU_INFO_KEY_TEMPLATE.format(psu_index) -def psu_db_update(psu_tbl, psu_num): - for psu_index in range(1, psu_num + 1): - fvs = swsscommon.FieldValuePairs([(PSU_INFO_PRESENCE_FIELD, - 'true' if _wrapper_get_psu_presence(psu_index) else 'false'), - (PSU_INFO_STATUS_FIELD, - 'true' if _wrapper_get_psu_status(psu_index) else 'false')]) - psu_tbl.set(get_psu_key(psu_index), fvs) - - # try get information from platform API and return a default value if we catch NotImplementedError def try_get(callback, default=None): """ @@ -443,7 +434,6 @@ class DaemonPsud(daemon_base.DaemonBase): return False self._update_psu_entity_info() - psu_db_update(self.psu_tbl, self.num_psus) self.update_psu_data() self._update_led_color() @@ -606,6 +596,8 @@ class DaemonPsud(daemon_base.DaemonBase): (PSU_INFO_IN_CURRENT_FIELD, str(in_current)), (PSU_INFO_IN_VOLTAGE_FIELD, str(in_voltage)), (PSU_INFO_POWER_MAX_FIELD, str(max_power)), + (PSU_INFO_PRESENCE_FIELD, 'true' if _wrapper_get_psu_presence(index) else 'false'), + (PSU_INFO_STATUS_FIELD, 'true' if _wrapper_get_psu_status(index) else 'false'), ]) self.psu_tbl.set(name, fvs) diff --git a/sonic-psud/tests/test_DaemonPsud.py b/sonic-psud/tests/test_DaemonPsud.py index 805c76cc0..05d7d5dda 100644 --- a/sonic-psud/tests/test_DaemonPsud.py +++ b/sonic-psud/tests/test_DaemonPsud.py @@ -162,6 +162,8 @@ def _construct_expected_fvp(self, power=100.0, power_warning_suppress_threshold= (psud.PSU_INFO_IN_VOLTAGE_FIELD, '220.25'), (psud.PSU_INFO_IN_CURRENT_FIELD, '0.72'), (psud.PSU_INFO_POWER_MAX_FIELD, 'N/A'), + (psud.PSU_INFO_PRESENCE_FIELD, 'true'), + (psud.PSU_INFO_STATUS_FIELD, 'true'), ]) return expected_fvp diff --git a/sonic-psud/tests/test_psud.py b/sonic-psud/tests/test_psud.py index 405ec30c6..b95a4986a 100644 --- a/sonic-psud/tests/test_psud.py +++ b/sonic-psud/tests/test_psud.py @@ -96,70 +96,6 @@ def test_wrapper_get_psu_status(): psud.platform_psuutil.get_psu_status.assert_called_with(1) -@mock.patch('psud._wrapper_get_psu_presence', mock.MagicMock()) -@mock.patch('psud._wrapper_get_psu_status', mock.MagicMock()) -def test_psu_db_update(): - psu_tbl = mock.MagicMock() - - psud._wrapper_get_psu_presence.return_value = True - psud._wrapper_get_psu_status.return_value = True - expected_fvp = psud.swsscommon.FieldValuePairs( - [(psud.PSU_INFO_PRESENCE_FIELD, 'true'), - (psud.PSU_INFO_STATUS_FIELD, 'true'), - ]) - psud.psu_db_update(psu_tbl, 1) - assert psu_tbl.set.call_count == 1 - psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(1), expected_fvp) - - psu_tbl.set.reset_mock() - - psud._wrapper_get_psu_presence.return_value = False - psud._wrapper_get_psu_status.return_value = True - expected_fvp = psud.swsscommon.FieldValuePairs( - [(psud.PSU_INFO_PRESENCE_FIELD, 'false'), - (psud.PSU_INFO_STATUS_FIELD, 'true'), - ]) - psud.psu_db_update(psu_tbl, 1) - assert psu_tbl.set.call_count == 1 - psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(1), expected_fvp) - - psu_tbl.set.reset_mock() - - psud._wrapper_get_psu_presence.return_value = True - psud._wrapper_get_psu_status.return_value = False - expected_fvp = psud.swsscommon.FieldValuePairs( - [(psud.PSU_INFO_PRESENCE_FIELD, 'true'), - (psud.PSU_INFO_STATUS_FIELD, 'false'), - ]) - psud.psu_db_update(psu_tbl, 1) - assert psu_tbl.set.call_count == 1 - psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(1), expected_fvp) - - psu_tbl.set.reset_mock() - - psud._wrapper_get_psu_presence.return_value = False - psud._wrapper_get_psu_status.return_value = False - expected_fvp = psud.swsscommon.FieldValuePairs( - [(psud.PSU_INFO_PRESENCE_FIELD, 'false'), - (psud.PSU_INFO_STATUS_FIELD, 'false'), - ]) - psud.psu_db_update(psu_tbl, 1) - assert psu_tbl.set.call_count == 1 - psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(1), expected_fvp) - - psu_tbl.set.reset_mock() - - psud._wrapper_get_psu_presence.return_value = True - psud._wrapper_get_psu_status.return_value = True - expected_fvp = psud.swsscommon.FieldValuePairs( - [(psud.PSU_INFO_PRESENCE_FIELD, 'true'), - (psud.PSU_INFO_STATUS_FIELD, 'true'), - ]) - psud.psu_db_update(psu_tbl, 32) - assert psu_tbl.set.call_count == 32 - psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(32), expected_fvp) - - def test_log_on_status_changed(): normal_log = "Normal log message" abnormal_log = "Abnormal log message" diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 85fdccd0d..438f40c22 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -1,5 +1,5 @@ #from unittest.mock import DEFAULT -from xcvrd.xcvrd_utilities.port_mapping import * +from xcvrd.xcvrd_utilities.port_event_helper import * from xcvrd.xcvrd_utilities.sfp_status_helper import * from xcvrd.xcvrd_utilities.media_settings_parser import * from xcvrd.xcvrd_utilities.optics_si_parser import * @@ -157,7 +157,7 @@ def test_CmisManagerTask_task_run_with_exception(self): assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace)) assert("wait_for_port_config_done" in str(trace)) - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError)) + @patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError)) def test_DomInfoUpdateTask_task_run_with_exception(self): port_mapping = PortMapping() stop_event = threading.Event() @@ -178,7 +178,7 @@ def test_DomInfoUpdateTask_task_run_with_exception(self): assert("subscribe_port_config_change" in str(trace)) @patch('xcvrd.xcvrd.SfpStateUpdateTask.init', MagicMock()) - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError)) + @patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError)) def test_SfpStateUpdateTask_task_run_with_exception(self): port_mapping = PortMapping() stop_event = threading.Event() @@ -240,7 +240,7 @@ def test_is_cmis_api(self, mock_class, expected_return_value): assert is_cmis_api(mock_xcvr_api) == expected_return_value @patch('xcvrd.xcvrd._wrapper_get_sfp_type') - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_dom_info', MagicMock(return_value={'temperature': '22.75', 'voltage': '0.5', @@ -277,7 +277,7 @@ def test_post_port_dom_info_to_db(self, mock_get_sfp_type): mock_get_sfp_type.return_value = 'QSFP_DD' post_port_dom_info_to_db(logical_port_name, port_mapping, dom_tbl, stop_event) - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_firmware_info', MagicMock(return_value={'active_firmware': '2.1.1', 'inactive_firmware': '1.2.4'})) @@ -299,7 +299,7 @@ def test_post_port_dom_threshold_info_to_db(self, mock_get_sfp_type): mock_get_sfp_type.return_value = 'QSFP_DD' post_port_dom_info_to_db(logical_port_name, port_mapping, dom_threshold_tbl, stop_event) - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_pm', MagicMock(return_value={'prefec_ber_avg': '0.0003407240007014899', 'prefec_ber_min': '0.0006814479342250317', @@ -316,7 +316,7 @@ def test_post_port_pm_info_to_db(self): post_port_pm_info_to_db(logical_port_name, port_mapping, pm_tbl, stop_event) assert pm_tbl.get_size_for_key(logical_port_name) == 6 - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) def test_del_port_sfp_dom_info_from_db(self): logical_port_name = "Ethernet0" @@ -364,7 +364,7 @@ def test_delete_port_from_status_table_sw(self): delete_port_from_status_table_sw(logical_port_name, status_tbl) assert status_tbl.get_size_for_key(logical_port_name) == 1 - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_dom_threshold_info', MagicMock(return_value={'temphighalarm': '22.75', 'temphighwarning': '0.5', @@ -393,7 +393,7 @@ def test_post_port_dom_threshold_info_to_db(self): dom_threshold_tbl = Table("STATE_DB", TRANSCEIVER_DOM_THRESHOLD_TABLE) post_port_dom_threshold_info_to_db(logical_port_name, port_mapping, dom_threshold_tbl, stop_event) - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_info', MagicMock(return_value={'type': '22.75', @@ -445,7 +445,7 @@ def test_post_port_sfp_info_to_db(self): transceiver_dict = {} post_port_sfp_info_to_db(logical_port_name, port_mapping, dom_tbl, transceiver_dict, stop_event) - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd.platform_sfputil', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) @@ -525,7 +525,7 @@ def test_post_port_sfp_info_and_dom_thr_to_db_once(self): task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event, sfp_error_event) task._post_port_sfp_info_and_dom_thr_to_db_once(port_mapping, xcvr_table_helper, stop_event) - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd.platform_sfputil', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) @@ -802,19 +802,154 @@ def test_is_error_sfp_status(self): @patch('swsscommon.swsscommon.SubscriberStateTable') @patch('swsscommon.swsscommon.Select.select') def test_handle_port_update_event(self, mock_select, mock_sub_table): + class DummyPortChangeEventHandler: + def __init__(self): + self.port_event_cache = [] + + def handle_port_change_event(self, port_event): + self.port_event_cache.append(port_event) + + CONFIG_DB = 'CONFIG_DB' + PORT_TABLE = swsscommon.CFG_PORT_TABLE_NAME + port_change_event_handler = DummyPortChangeEventHandler() + expected_processed_event_count = 0 + mock_selectable = MagicMock() - mock_selectable.pop = MagicMock( - side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None)]) + side_effect_list = [ + ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), ('speed', '40000'), ('fec', 'rs'))), + (None, None, None) + ] + mock_selectable.pop = MagicMock(side_effect=side_effect_list) mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) mock_sub_table.return_value = mock_selectable logger = MagicMock() - - sel, asic_context = subscribe_port_update_event(DEFAULT_NAMESPACE, logger) - port_mapping = PortMapping() stop_event = threading.Event() stop_event.is_set = MagicMock(return_value=False) - handle_port_update_event(sel, asic_context, stop_event, - logger, port_mapping.handle_port_change_event) + + observer = PortChangeObserver(DEFAULT_NAMESPACE, logger, stop_event, + port_change_event_handler.handle_port_change_event, + [{CONFIG_DB: PORT_TABLE}]) + + # Test basic single update event without filtering: + assert observer.handle_port_update_event() + expected_processed_event_count +=1 + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + # 'fec' should not be filtered out + expected_cache = { + ('Ethernet0', CONFIG_DB, PORT_TABLE): { + 'port_name': 'Ethernet0', + 'index': '1', + 'op': swsscommon.SET_COMMAND, + 'asic_id': 0, + 'speed': '40000', + 'fec': 'rs' + } + } + assert observer.port_event_cache == expected_cache + + observer = PortChangeObserver(DEFAULT_NAMESPACE, logger, stop_event, + port_change_event_handler.handle_port_change_event, + [{CONFIG_DB: PORT_TABLE, 'FILTER': ['speed']}]) + mock_selectable.pop.side_effect = iter(side_effect_list) + + # Test basic single update event with filtering: + assert not observer.port_event_cache + assert observer.handle_port_update_event() + expected_processed_event_count +=1 + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + # 'fec' should be filtered out + expected_cache = { + ('Ethernet0', CONFIG_DB, PORT_TABLE): { + 'port_name': 'Ethernet0', + 'index': '1', + 'op': swsscommon.SET_COMMAND, + 'asic_id': 0, + 'speed': '40000', + } + } + assert observer.port_event_cache == expected_cache + assert port_change_event_handler.port_event_cache[-1].port_name == 'Ethernet0' + assert port_change_event_handler.port_event_cache[-1].event_type == PortChangeEvent.PORT_SET + assert port_change_event_handler.port_event_cache[-1].port_index == 1 + assert port_change_event_handler.port_event_cache[-1].asic_id == 0 + assert port_change_event_handler.port_event_cache[-1].db_name == CONFIG_DB + assert port_change_event_handler.port_event_cache[-1].table_name == PORT_TABLE + assert port_change_event_handler.port_event_cache[-1].port_dict == \ + expected_cache[('Ethernet0', CONFIG_DB, PORT_TABLE)] + + # Test duplicate update event on the same key: + mock_selectable.pop.side_effect = iter(side_effect_list) + # return False when no new event is processed + assert not observer.handle_port_update_event() + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + assert observer.port_event_cache == expected_cache + + # Test soaking multiple different update events on the same key: + side_effect_list = [ + ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), ('speed', '100000'))), + ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), ('speed', '200000'))), + ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), ('speed', '400000'))), + (None, None, None) + ] + mock_selectable.pop.side_effect = iter(side_effect_list) + assert observer.handle_port_update_event() + # only the last event should be processed + expected_processed_event_count +=1 + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + expected_cache = { + ('Ethernet0', CONFIG_DB, PORT_TABLE): { + 'port_name': 'Ethernet0', + 'index': '1', + 'op': swsscommon.SET_COMMAND, + 'asic_id': 0, + 'speed': '400000', + } + } + assert observer.port_event_cache == expected_cache + + # Test select timeout case: + mock_select.return_value = (swsscommon.Select.TIMEOUT, None) + assert not observer.handle_port_update_event() + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + mock_select.return_value = (swsscommon.Select.OBJECT, None) + + # Test update event for DEL case: + side_effect_list = [ + ('Ethernet0', swsscommon.DEL_COMMAND, (('index', '1'), ('speed', '400000'))), + (None, None, None) + ] + mock_selectable.pop.side_effect = iter(side_effect_list) + assert observer.handle_port_update_event() + expected_processed_event_count +=1 + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + expected_cache = { + ('Ethernet0', CONFIG_DB, PORT_TABLE): { + 'port_name': 'Ethernet0', + 'index': '1', + 'op': swsscommon.DEL_COMMAND, + 'asic_id': 0, + 'speed': '400000', + } + } + assert observer.port_event_cache == expected_cache + + # Test update event if it's a subset of cached event: + side_effect_list = [ + ('Ethernet0', swsscommon.DEL_COMMAND, (('index', '1'), )), + (None, None, None) + ] + mock_selectable.pop.side_effect = iter(side_effect_list) + assert not observer.handle_port_update_event() + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + expected_cache = { + ('Ethernet0', CONFIG_DB, PORT_TABLE): { + 'port_name': 'Ethernet0', + 'index': '1', + 'op': swsscommon.DEL_COMMAND, + 'asic_id': 0, + } + } + assert observer.port_event_cache == expected_cache @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) @patch('swsscommon.swsscommon.SubscriberStateTable') @@ -952,8 +1087,7 @@ def test_CmisManagerTask_get_configured_tx_power_from_db(self, mock_table_helper assert task.get_configured_tx_power_from_db('Ethernet0') == -10 @patch('xcvrd.xcvrd.platform_chassis') - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None))) - @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock()) + @patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) def test_CmisManagerTask_task_run_stop(self, mock_chassis): mock_object = MagicMock() mock_object.get_presence = MagicMock(return_value=True) @@ -1180,8 +1314,7 @@ def test_CmisManagerTask_post_port_active_apsel_to_db(self): @patch('xcvrd.xcvrd.platform_chassis') - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None))) - @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock()) + @patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) @patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock()) @patch('xcvrd.xcvrd.is_cmis_api', MagicMock(return_value=True)) @@ -1328,6 +1461,39 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): assert mock_xcvr_api.tx_disable_channel.call_count == 2 assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_ACTIVATION' + @pytest.mark.parametrize("lport, expected_dom_polling", [ + ('Ethernet0', 'disabled'), + ('Ethernet4', 'disabled'), + ('Ethernet8', 'disabled'), + ('Ethernet12', 'disabled'), + ('Ethernet16', 'enabled'), + ('Ethernet20', 'enabled') + ]) + def test_DomInfoUpdateTask_get_dom_polling_from_config_db(self, lport, expected_dom_polling): + # Define the mock_get function inside the test function + def mock_get(key): + if key in ['Ethernet4', 'Ethernet8', 'Ethernet12', 'Ethernet16']: + return (True, [('dom_polling', 'enabled')]) + elif key == 'Ethernet0': + return (True, [('dom_polling', 'disabled')]) + else: + return None + + port_mapping = PortMapping() + stop_event = threading.Event() + task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) + task.port_mapping.handle_port_change_event(PortChangeEvent('Ethernet4', 1, 0, PortChangeEvent.PORT_ADD)) + task.port_mapping.handle_port_change_event(PortChangeEvent('Ethernet12', 1, 0, PortChangeEvent.PORT_ADD)) + task.port_mapping.handle_port_change_event(PortChangeEvent('Ethernet8', 1, 0, PortChangeEvent.PORT_ADD)) + task.port_mapping.handle_port_change_event(PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)) + task.port_mapping.handle_port_change_event(PortChangeEvent('Ethernet16', 2, 0, PortChangeEvent.PORT_ADD)) + cfg_port_tbl = MagicMock() + cfg_port_tbl.get = MagicMock(side_effect=mock_get) + task.xcvr_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl) + + assert task.get_dom_polling_from_config_db(lport) == expected_dom_polling + @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) @patch('xcvrd.xcvrd.delete_port_from_status_table_hw') def test_DomInfoUpdateTask_handle_port_change_event(self, mock_del_status_tbl_hw): @@ -1351,8 +1517,8 @@ def test_DomInfoUpdateTask_handle_port_change_event(self, mock_del_status_tbl_hw assert not task.port_mapping.logical_to_asic assert mock_del_status_tbl_hw.call_count == 1 - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) - @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) + @patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_event_helper.handle_port_config_change', MagicMock()) def test_DomInfoUpdateTask_task_run_stop(self): port_mapping = PortMapping() stop_event = threading.Event() @@ -1363,6 +1529,7 @@ def test_DomInfoUpdateTask_task_run_stop(self): @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) @patch('xcvrd.xcvrd_utilities.sfp_status_helper.detect_port_in_error_status') + @patch('xcvrd.xcvrd.post_port_sfp_firmware_info_to_db') @patch('xcvrd.xcvrd.post_port_dom_info_to_db') @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) @patch('swsscommon.swsscommon.SubscriberStateTable') @@ -1371,10 +1538,10 @@ def test_DomInfoUpdateTask_task_run_stop(self): @patch('xcvrd.xcvrd.post_port_pm_info_to_db') def test_DomInfoUpdateTask_task_worker(self, mock_post_pm_info, mock_update_status_hw, mock_select, mock_sub_table, - mock_post_dom_info, mock_detect_error): + mock_post_dom_info, mock_post_firmware_info, mock_detect_error): mock_selectable = MagicMock() mock_selectable.pop = MagicMock( - side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None), (None, None, None)]) + side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None), (None, None, None), (None, None, None)]) mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) mock_sub_table.return_value = mock_selectable @@ -1383,18 +1550,22 @@ def test_DomInfoUpdateTask_task_worker(self, mock_post_pm_info, mock_update_stat task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.task_stopping_event.wait = MagicMock(side_effect=[False, True]) + task.get_dom_polling_from_config_db = MagicMock(return_value='enabled') mock_detect_error.return_value = True task.task_worker() assert task.port_mapping.logical_port_list.count('Ethernet0') assert task.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 assert task.port_mapping.get_physical_to_logical(1) == ['Ethernet0'] assert task.port_mapping.get_logical_to_physical('Ethernet0') == [1] + assert mock_post_firmware_info.call_count == 0 assert mock_post_dom_info.call_count == 0 assert mock_update_status_hw.call_count == 0 assert mock_post_pm_info.call_count == 0 mock_detect_error.return_value = False - task.task_stopping_event.wait = MagicMock(side_effect=[False, True]) + task.task_stopping_event.wait = MagicMock(side_effect=[False, False, True]) + task.get_dom_polling_from_config_db = MagicMock(side_effect=('disabled', 'enabled')) task.task_worker() + assert mock_post_firmware_info.call_count == 1 assert mock_post_dom_info.call_count == 1 assert mock_update_status_hw.call_count == 1 assert mock_post_pm_info.call_count == 1 @@ -1445,7 +1616,7 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_update_status_hw assert not task.port_mapping.logical_to_asic assert mock_update_status_hw.call_count == 1 - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(return_value=(None, None))) def test_SfpStateUpdateTask_task_run_stop(self): port_mapping = PortMapping() stop_event = threading.Event() @@ -1514,8 +1685,8 @@ def test_SfpStateUpdateTask_mapping_event_from_change_event(self): @patch('time.sleep', MagicMock()) @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) @patch('xcvrd.xcvrd._wrapper_soak_sfp_insert_event', MagicMock()) - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) - @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) + @patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_event_helper.handle_port_config_change', MagicMock()) @patch('xcvrd.xcvrd.SfpStateUpdateTask.init', MagicMock()) @patch('os.kill') @patch('xcvrd.xcvrd.SfpStateUpdateTask._mapping_event_from_change_event') @@ -1594,7 +1765,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_del_status_hw, assert mock_update_status.call_count == 1 assert mock_post_sfp_info.call_count == 1 assert mock_post_dom_th.call_count == 1 - assert mock_post_firmware_info.call_count == 1 + assert mock_post_firmware_info.call_count == 0 assert mock_update_media_setting.call_count == 1 stop_event.is_set = MagicMock(side_effect=[False, True]) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 38d884ff4..2fa420ffb 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -21,12 +21,14 @@ import traceback import ctypes + from natsort import natsorted from sonic_py_common import daemon_base, device_info, logger from sonic_py_common import multi_asic from swsscommon import swsscommon from .xcvrd_utilities import sfp_status_helper - from .xcvrd_utilities import port_mapping + from .xcvrd_utilities import port_event_helper + from .xcvrd_utilities.port_event_helper import PortChangeObserver from .xcvrd_utilities import media_settings_parser from .xcvrd_utilities import optics_si_parser @@ -520,7 +522,7 @@ def post_port_sfp_info_to_db(logical_port_name, port_mapping, table, transceiver # Update port sfp firmware info in db def post_port_sfp_firmware_info_to_db(logical_port_name, port_mapping, table, - stop_event=threading.Event()): + stop_event=threading.Event(), firmware_info_cache=None): for physical_port, physical_port_name in get_physical_port_name_dict(logical_port_name, port_mapping).items(): if stop_event.is_set(): break @@ -529,8 +531,15 @@ def post_port_sfp_firmware_info_to_db(logical_port_name, port_mapping, table, continue try: - transceiver_firmware_info_dict = _wrapper_get_transceiver_firmware_info(physical_port) - if transceiver_firmware_info_dict is not None: + if firmware_info_cache is not None and physical_port in firmware_info_cache: + # If cache is enabled and firmware information is in cache, just read from cache, no need read from EEPROM + transceiver_firmware_info_dict = firmware_info_cache[physical_port] + else: + transceiver_firmware_info_dict = _wrapper_get_transceiver_firmware_info(physical_port) + if firmware_info_cache is not None: + # If cache is enabled, put firmware information to cache + firmware_info_cache[physical_port] = transceiver_firmware_info_dict + if transceiver_firmware_info_dict: fvs = swsscommon.FieldValuePairs([(k, v) for k, v in transceiver_firmware_info_dict.items()]) table.set(physical_port_name, fvs) else: @@ -758,7 +767,7 @@ class CmisManagerTask(threading.Thread): CMIS_MAX_RETRIES = 3 CMIS_DEF_EXPIRED = 60 # seconds, default expiration time - CMIS_MODULE_TYPES = ['QSFP-DD', 'QSFP_DD', 'OSFP', 'QSFP+C'] + CMIS_MODULE_TYPES = ['QSFP-DD', 'QSFP_DD', 'OSFP', 'OSFP-8X', 'QSFP+C'] CMIS_MAX_HOST_LANES = 8 CMIS_STATE_UNKNOWN = 'UNKNOWN' @@ -1206,7 +1215,7 @@ def wait_for_port_config_done(self, namespace): # Make sure this daemon started after all port configured while not self.task_stopping_event.is_set(): - (state, c) = sel.select(port_mapping.SELECT_TIMEOUT_MSECS) + (state, c) = sel.select(port_event_helper.SELECT_TIMEOUT_MSECS) if state == swsscommon.Select.TIMEOUT: continue if state != swsscommon.Select.OBJECT: @@ -1225,15 +1234,14 @@ def task_worker(self): self.wait_for_port_config_done(namespace) # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal - sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces, helper_logger) - while not self.task_stopping_event.is_set(): - # Handle port change event from main thread - port_mapping.handle_port_update_event(sel, - asic_context, + port_change_observer = PortChangeObserver(self.namespaces, helper_logger, self.task_stopping_event, - helper_logger, self.on_port_update_event) + while not self.task_stopping_event.is_set(): + # Handle port change event from main thread + port_change_observer.handle_port_update_event() + for lport, info in self.port_dict.items(): if self.task_stopping_event.is_set(): break @@ -1580,27 +1588,69 @@ def __init__(self, namespaces, port_mapping, main_thread_stop_event): self.port_mapping = copy.deepcopy(port_mapping) self.namespaces = namespaces + def get_dom_polling_from_config_db(self, lport): + """ + Returns the value of dom_polling field from PORT table in CONFIG_DB + For non-breakout ports, this function will get dom_polling field from PORT table of lport (subport = 0) + For breakout ports, this function will get dom_polling field from PORT table of the first subport + of lport's correpsonding breakout group (subport = 1) + + Returns: + 'disabled' if dom_polling is set to 'disabled', otherwise 'enabled' + """ + dom_polling = 'enabled' + + pport_list = self.port_mapping.get_logical_to_physical(lport) + if not pport_list: + helper_logger.log_warning("Get dom disabled: Got unknown physical port list {} for lport {}".format(pport_list, lport)) + return dom_polling + pport = pport_list[0] + + logical_port_list = self.port_mapping.get_physical_to_logical(pport) + if logical_port_list is None: + helper_logger.log_warning("Get dom disabled: Got unknown FP port index {}".format(pport)) + return dom_polling + + # Sort the logical port list to make sure we always get the first subport + logical_port_list = natsorted(logical_port_list, key=lambda y: y.lower()) + first_logical_port = logical_port_list[0] + + asic_index = self.port_mapping.get_asic_id_for_logical_port(first_logical_port) + port_tbl = self.xcvr_table_helper.get_cfg_port_tbl(asic_index) + + found, port_info = port_tbl.get(first_logical_port) + if found and 'dom_polling' in dict(port_info): + dom_polling = dict(port_info)['dom_polling'] + + return dom_polling + + def is_port_dom_monitoring_disabled(self, logical_port_name): + return self.get_dom_polling_from_config_db(logical_port_name) == 'disabled' + def task_worker(self): self.xcvr_table_helper = XcvrTableHelper(self.namespaces) helper_logger.log_info("Start DOM monitoring loop") + firmware_info_cache = {} dom_info_cache = {} - dom_th_info_cache = {} transceiver_status_cache = {} pm_info_cache = {} - sel, asic_context = port_mapping.subscribe_port_config_change(self.namespaces) + sel, asic_context = port_event_helper.subscribe_port_config_change(self.namespaces) # Start loop to update dom info in DB periodically while not self.task_stopping_event.wait(DOM_INFO_UPDATE_PERIOD_SECS): # Clear the cache at the begin of the loop to make sure it will be clear each time + firmware_info_cache.clear() dom_info_cache.clear() - dom_th_info_cache.clear() transceiver_status_cache.clear() pm_info_cache.clear() # Handle port change event from main thread - port_mapping.handle_port_config_change(sel, asic_context, self.task_stopping_event, self.port_mapping, helper_logger, self.on_port_config_change) + port_event_helper.handle_port_config_change(sel, asic_context, self.task_stopping_event, self.port_mapping, helper_logger, self.on_port_config_change) logical_port_list = self.port_mapping.logical_port_list for logical_port_name in logical_port_list: + if self.is_port_dom_monitoring_disabled(logical_port_name): + continue + # Get the asic to which this port belongs asic_index = self.port_mapping.get_asic_id_for_logical_port(logical_port_name) if asic_index is None: @@ -1608,6 +1658,12 @@ def task_worker(self): continue if not sfp_status_helper.detect_port_in_error_status(logical_port_name, self.xcvr_table_helper.get_status_tbl(asic_index)): + try: + post_port_sfp_firmware_info_to_db(logical_port_name, self.port_mapping, self.xcvr_table_helper.get_firmware_info_tbl(asic_index), self.task_stopping_event, firmware_info_cache=firmware_info_cache) + except (KeyError, TypeError) as e: + #continue to process next port since execption could be raised due to port reset, transceiver removal + helper_logger.log_warning("Got exception {} while processing firmware info for port {}, ignored".format(repr(e), logical_port_name)) + continue try: post_port_dom_info_to_db(logical_port_name, self.port_mapping, self.xcvr_table_helper.get_dom_tbl(asic_index), self.task_stopping_event, dom_info_cache=dom_info_cache) except (KeyError, TypeError) as e: @@ -1655,7 +1711,7 @@ def join(self): raise self.exc def on_port_config_change(self, port_change_event): - if port_change_event.event_type == port_mapping.PortChangeEvent.PORT_REMOVE: + if port_change_event.event_type == port_event_helper.PortChangeEvent.PORT_REMOVE: self.on_remove_logical_port(port_change_event) self.port_mapping.handle_port_change_event(port_change_event) @@ -1665,16 +1721,16 @@ def on_remove_logical_port(self, port_change_event): Args: port_change_event (object): port change event """ - # To avoid race condition, remove the entry TRANSCEIVER_DOM_SENSOR, TRANSCEIVER_PM and HW section of TRANSCEIVER_STATUS table. - # This thread only updates TRANSCEIVER_DOM_SENSOR, TRANSCEIVER_PM and HW section of TRANSCEIVER_STATUS table, - # so we don't have to remove entries from TRANSCEIVER_INFO, TRANSCEIVER_FIRMWARE_INFO and TRANSCEIVER_DOM_THRESHOLD + # To avoid race condition, remove the entry TRANSCEIVER_FIRMWARE_INFO, TRANSCEIVER_DOM_SENSOR, TRANSCEIVER_PM and HW section of TRANSCEIVER_STATUS table. + # This thread only updates TRANSCEIVER_FIRMWARE_INFO, TRANSCEIVER_DOM_SENSOR, TRANSCEIVER_PM and HW section of TRANSCEIVER_STATUS table, + # so we don't have to remove entries from TRANSCEIVER_INFO and TRANSCEIVER_DOM_THRESHOLD del_port_sfp_dom_info_from_db(port_change_event.port_name, self.port_mapping, None, self.xcvr_table_helper.get_dom_tbl(port_change_event.asic_id), None, self.xcvr_table_helper.get_pm_tbl(port_change_event.asic_id), - None) + self.xcvr_table_helper.get_firmware_info_tbl(port_change_event.asic_id)) delete_port_from_status_table_hw(port_change_event.port_name, self.port_mapping, self.xcvr_table_helper.get_status_tbl(port_change_event.asic_id)) @@ -1752,7 +1808,6 @@ def _post_port_sfp_info_and_dom_thr_to_db_once(self, port_mapping, xcvr_table_he rc = post_port_sfp_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_intf_tbl(asic_index), transceiver_dict, stop_event) if rc != SFP_EEPROM_NOT_READY: post_port_dom_threshold_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_dom_threshold_tbl(asic_index), stop_event) - post_port_sfp_firmware_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_firmware_info_tbl(asic_index), stop_event) # Do not notify media settings during warm reboot to avoid dataplane traffic impact if is_warm_start == False: @@ -1792,7 +1847,7 @@ def _init_port_sfp_status_tbl(self, port_mapping, xcvr_table_helper, stop_event= update_port_transceiver_status_table_sw(logical_port_name, xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_INSERTED) def init(self): - port_mapping_data = port_mapping.get_port_mapping(self.namespaces) + port_mapping_data = port_event_helper.get_port_mapping(self.namespaces) # Post all the current interface sfp/dom threshold info to STATE_DB self.retry_eeprom_set = self._post_port_sfp_info_and_dom_thr_to_db_once(port_mapping_data, self.xcvr_table_helper, self.main_thread_stop_event) @@ -1879,9 +1934,9 @@ def task_worker(self, stopping_event, sfp_error_event): state = STATE_INIT self.init() - sel, asic_context = port_mapping.subscribe_port_config_change(self.namespaces) + sel, asic_context = port_event_helper.subscribe_port_config_change(self.namespaces) while not stopping_event.is_set(): - port_mapping.handle_port_config_change(sel, asic_context, stopping_event, self.port_mapping, helper_logger, self.on_port_config_change) + port_event_helper.handle_port_config_change(sel, asic_context, stopping_event, self.port_mapping, helper_logger, self.on_port_config_change) # Retry those logical ports whose EEPROM reading failed or timeout when the SFP is inserted self.retry_eeprom_reading() @@ -1978,7 +2033,6 @@ def task_worker(self, stopping_event, sfp_error_event): if rc != SFP_EEPROM_NOT_READY: post_port_dom_threshold_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_dom_threshold_tbl(asic_index)) - post_port_sfp_firmware_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_firmware_info_tbl(asic_index)) media_settings_parser.notify_media_setting(logical_port, transceiver_dict, self.xcvr_table_helper.get_app_port_tbl(asic_index), self.xcvr_table_helper.get_cfg_port_tbl(asic_index), self.port_mapping) transceiver_dict.clear() elif value == sfp_status_helper.SFP_STATUS_REMOVED: @@ -2096,10 +2150,10 @@ def join(self): raise self.exc def on_port_config_change(self , port_change_event): - if port_change_event.event_type == port_mapping.PortChangeEvent.PORT_REMOVE: + if port_change_event.event_type == port_event_helper.PortChangeEvent.PORT_REMOVE: self.on_remove_logical_port(port_change_event) self.port_mapping.handle_port_change_event(port_change_event) - elif port_change_event.event_type == port_mapping.PortChangeEvent.PORT_ADD: + elif port_change_event.event_type == port_event_helper.PortChangeEvent.PORT_ADD: self.port_mapping.handle_port_change_event(port_change_event) self.on_add_logical_port(port_change_event) @@ -2147,7 +2201,6 @@ def on_add_logical_port(self, port_change_event): status_tbl = self.xcvr_table_helper.get_status_tbl(port_change_event.asic_id) int_tbl = self.xcvr_table_helper.get_intf_tbl(port_change_event.asic_id) dom_threshold_tbl = self.xcvr_table_helper.get_dom_threshold_tbl(port_change_event.asic_id) - firmware_info_tbl = self.xcvr_table_helper.get_firmware_info_tbl(port_change_event.asic_id) error_description = 'N/A' status = None @@ -2182,7 +2235,6 @@ def on_add_logical_port(self, port_change_event): self.retry_eeprom_set.add(port_change_event.port_name) else: post_port_dom_threshold_info_to_db(port_change_event.port_name, self.port_mapping, dom_threshold_tbl) - post_port_sfp_firmware_info_to_db(port_change_event.port_name, self.port_mapping, firmware_info_tbl) media_settings_parser.notify_media_setting(port_change_event.port_name, transceiver_dict, self.xcvr_table_helper.get_app_port_tbl(port_change_event.asic_id), self.xcvr_table_helper.get_cfg_port_tbl(port_change_event.asic_id), self.port_mapping) else: status = sfp_status_helper.SFP_STATUS_REMOVED if not status else status @@ -2209,7 +2261,6 @@ def retry_eeprom_reading(self): rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_intf_tbl(asic_index), transceiver_dict) if rc != SFP_EEPROM_NOT_READY: post_port_dom_threshold_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_dom_threshold_tbl(asic_index)) - post_port_sfp_firmware_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_firmware_info_tbl(asic_index)) media_settings_parser.notify_media_setting(logical_port, transceiver_dict, self.xcvr_table_helper.get_app_port_tbl(asic_index), self.xcvr_table_helper.get_cfg_port_tbl(asic_index), self.port_mapping) transceiver_dict.clear() retry_success_set.add(logical_port) @@ -2255,7 +2306,7 @@ def wait_for_port_config_done(self, namespace): # Make sure this daemon started after all port configured while not self.stop_event.is_set(): - (state, c) = sel.select(port_mapping.SELECT_TIMEOUT_MSECS) + (state, c) = sel.select(port_event_helper.SELECT_TIMEOUT_MSECS) if state == swsscommon.Select.TIMEOUT: continue if state != swsscommon.Select.OBJECT: @@ -2319,14 +2370,14 @@ def init(self): self.wait_for_port_config_done(namespace) self.log_notice("XCVRD INIT: After port config is done") - return port_mapping.get_port_mapping(self.namespaces) + return port_event_helper.get_port_mapping(self.namespaces) # Deinitialize daemon def deinit(self): self.log_info("Start daemon deinit...") # Delete all the information from DB and then exit - port_mapping_data = port_mapping.get_port_mapping(self.namespaces) + port_mapping_data = port_event_helper.get_port_mapping(self.namespaces) logical_port_list = port_mapping_data.logical_port_list for logical_port_name in logical_port_list: # Get the asic to which this port belongs diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_event_helper.py similarity index 52% rename from sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py rename to sonic-xcvrd/xcvrd/xcvrd_utilities/port_event_helper.py index 788dc8134..3c0dbffa3 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_event_helper.py @@ -4,6 +4,11 @@ from swsscommon import swsscommon SELECT_TIMEOUT_MSECS = 1000 +DEFAULT_PORT_TBL_MAP = [ + {'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME}, + {'STATE_DB': 'TRANSCEIVER_INFO'}, + {'STATE_DB': 'PORT_TABLE', 'FILTER': ['host_tx_ready']}, +] class PortChangeEvent: @@ -11,9 +16,9 @@ class PortChangeEvent: PORT_REMOVE = 1 PORT_SET = 2 PORT_DEL = 3 - PORT_EVENT = {} - def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None): + def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None, + db_name=None, table_name=None): # Logical port name, e.g. Ethernet0 self.port_name = port_name # Physical port index, equals to "index" field of PORT table in CONFIG_DB @@ -24,6 +29,8 @@ def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None): self.event_type = event_type # Port config dict self.port_dict = port_dict + self.db_name = db_name + self.table_name = table_name def __str__(self): return '{} - name={} index={} asic_id={}'.format('Add' if self.event_type == self.PORT_ADD else 'Remove', @@ -31,6 +38,147 @@ def __str__(self): self.port_index, self.asic_id) +class PortChangeObserver: + """ + PortChangeObserver is a class to monitor port change events in DBs, and + notify callback function + """ + + def __init__(self, namespaces, logger, + stop_event, + port_change_event_handler, + port_tbl_map=DEFAULT_PORT_TBL_MAP): + """ + Args: + namespaces (list): List of namespaces to monitor + logger (Logger): Logger object + stop_event (threading.Event): Stop event to stop the observer + port_change_event_handler (function): Callback function to handle port change event + port_tbl_map (list): List of dictionaries, each dictionary contains + the DB name and table name to monitor + """ + # To avoid duplicate event processing, this dict stores the latest port + # change event for each key which is a tuple of + # (port_name, port_tbl.db_name, port_tbl.table_name) + self.port_event_cache = {} + self.namespaces = namespaces + self.logger = logger + self.stop_event = stop_event + self.port_change_event_handler = port_change_event_handler + self.port_tbl_map = port_tbl_map + self.subscribe_port_update_event() + + def apply_filter_to_fvp(self, filter, fvp): + if filter is not None: + for key in fvp.copy().keys(): + if key not in (set(filter) | set({'index', 'port_name', 'asic_id', 'op'})): + del fvp[key] + + def subscribe_port_update_event(self): + """ + Subscribe to a particular DB's table and listen to only interested fields + Format : + { : , , , .. } where only field update will be received + """ + sel = swsscommon.Select() + asic_context = {} + for d in self.port_tbl_map: + for namespace in self.namespaces: + db = daemon_base.db_connect(list(d.keys())[0], namespace=namespace) + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + port_tbl = swsscommon.SubscriberStateTable(db, list(d.values())[0]) + port_tbl.db_name = list(d.keys())[0] + port_tbl.table_name = list(d.values())[0] + port_tbl.filter = d['FILTER'] if 'FILTER' in d else None + asic_context[port_tbl] = asic_id + sel.addSelectable(port_tbl) + self.logger.log_warning("subscribing to port_tbl {} - {} DB of namespace {} ".format( + port_tbl, list(d.values())[0], namespace)) + self.sel, self.asic_context = sel, asic_context + + def handle_port_update_event(self): + """ + Select PORT update events, notify the observers upon a port update in CONFIG_DB + or a XCVR insertion/removal in STATE_DB + + Returns: + bool: True if there's at least one update event; False if there's no update event. + """ + has_event = False + if not self.stop_event.is_set(): + (state, _) = self.sel.select(SELECT_TIMEOUT_MSECS) + if state == swsscommon.Select.TIMEOUT: + return has_event + if state != swsscommon.Select.OBJECT: + self.logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') + return has_event + + port_event_cache = {} + for port_tbl in self.asic_context.keys(): + while True: + (port_name, op, fvp) = port_tbl.pop() + if not port_name: + break + if not validate_port(port_name): + continue + fvp = dict(fvp) if fvp is not None else {} + self.logger.log_warning("$$$ {} handle_port_update_event() : op={} DB:{} Table:{} fvp {}".format( + port_name, op, port_tbl.db_name, port_tbl.table_name, fvp)) + if 'index' not in fvp: + fvp['index'] = '-1' + fvp['port_name'] = port_name + fvp['asic_id'] = self.asic_context[port_tbl] + fvp['op'] = op + fvp['FILTER'] = port_tbl.filter + # Soak duplicate events and consider only the last event + port_event_cache[(port_name, port_tbl.db_name, port_tbl.table_name)] = fvp + + # Now apply filter over soaked events + for key, fvp in port_event_cache.items(): + db_name = key[1] + table_name = key[2] + port_index = int(fvp['index']) + port_change_event = None + filter = fvp['FILTER'] + del fvp['FILTER'] + self.apply_filter_to_fvp(filter, fvp) + + if key in self.port_event_cache: + # Compare current event with last event on this key, to see if + # there's really a need to update. + diff = set(fvp.items()) - set(self.port_event_cache[key].items()) + # Ignore duplicate events + if not diff: + self.port_event_cache[key] = fvp + continue + # Update the latest event to the cache + self.port_event_cache[key] = fvp + + if fvp['op'] == swsscommon.SET_COMMAND: + port_change_event = PortChangeEvent(fvp['port_name'], + port_index, + fvp['asic_id'], + PortChangeEvent.PORT_SET, + fvp, + db_name, + table_name) + elif fvp['op'] == swsscommon.DEL_COMMAND: + port_change_event = PortChangeEvent(fvp['port_name'], + port_index, + fvp['asic_id'], + PortChangeEvent.PORT_DEL, + fvp, + db_name, + table_name) + # This is the final event considered for processing + self.logger.log_warning("*** {} handle_port_update_event() fvp {}".format( + key, fvp)) + if port_change_event is not None: + has_event = True + self.port_change_event_handler(port_change_event) + + return has_event + class PortMapping: def __init__(self): @@ -107,110 +255,6 @@ def subscribe_port_config_change(namespaces): sel.addSelectable(port_tbl) return sel, asic_context -def subscribe_port_update_event(namespaces, logger): - """ - Subscribe to a particular DB's table and listen to only interested fields - Format : - { :
, , , .. } where only field update will be received - """ - port_tbl_map = [ - {'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME}, - {'STATE_DB': 'TRANSCEIVER_INFO'}, - {'STATE_DB': 'PORT_TABLE', 'FILTER': ['host_tx_ready']}, - ] - - sel = swsscommon.Select() - asic_context = {} - for d in port_tbl_map: - for namespace in namespaces: - db = daemon_base.db_connect(list(d.keys())[0], namespace=namespace) - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - port_tbl = swsscommon.SubscriberStateTable(db, list(d.values())[0]) - port_tbl.db_name = list(d.keys())[0] - port_tbl.table_name = list(d.values())[0] - port_tbl.filter = d['FILTER'] if 'FILTER' in d else None - asic_context[port_tbl] = asic_id - sel.addSelectable(port_tbl) - logger.log_warning("subscribing to port_tbl {} - {} DB of namespace {} ".format( - port_tbl, list(d.values())[0], namespace)) - return sel, asic_context - -def apply_filter_to_fvp(filter, fvp): - if filter is not None: - for key in fvp.copy().keys(): - if key not in (set(filter) | set({'index', 'key', 'asic_id', 'op'})): - del fvp[key] - -def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_event_handler): - """ - Select PORT update events, notify the observers upon a port update in CONFIG_DB - or a XCVR insertion/removal in STATE_DB - """ - if not stop_event.is_set(): - (state, _) = sel.select(SELECT_TIMEOUT_MSECS) - if state == swsscommon.Select.TIMEOUT: - return - if state != swsscommon.Select.OBJECT: - logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') - return - - port_event_cache = {} - for port_tbl in asic_context.keys(): - while True: - (key, op, fvp) = port_tbl.pop() - if not key: - break - if not validate_port(key): - continue - fvp = dict(fvp) if fvp is not None else {} - logger.log_warning("$$$ {} handle_port_update_event() : op={} DB:{} Table:{} fvp {}".format( - key, op, port_tbl.db_name, port_tbl.table_name, fvp)) - - if 'index' not in fvp: - fvp['index'] = '-1' - fvp['key'] = key - fvp['asic_id'] = asic_context[port_tbl] - fvp['op'] = op - fvp['FILTER'] = port_tbl.filter - # Soak duplicate events and consider only the last event - port_event_cache[key+port_tbl.db_name+port_tbl.table_name] = fvp - - # Now apply filter over soaked events - for key, fvp in port_event_cache.items(): - port_index = int(fvp['index']) - port_change_event = None - diff = {} - filter = fvp['FILTER'] - del fvp['FILTER'] - apply_filter_to_fvp(filter, fvp) - - if key in PortChangeEvent.PORT_EVENT: - diff = dict(set(fvp.items()) - set(PortChangeEvent.PORT_EVENT[key].items())) - # Ignore duplicate events - if not diff: - PortChangeEvent.PORT_EVENT[key] = fvp - continue - PortChangeEvent.PORT_EVENT[key] = fvp - - if fvp['op'] == swsscommon.SET_COMMAND: - port_change_event = PortChangeEvent(fvp['key'], - port_index, - fvp['asic_id'], - PortChangeEvent.PORT_SET, - fvp) - elif fvp['op'] == swsscommon.DEL_COMMAND: - port_change_event = PortChangeEvent(fvp['key'], - port_index, - fvp['asic_id'], - PortChangeEvent.PORT_DEL, - fvp) - # This is the final event considered for processing - logger.log_warning("*** {} handle_port_update_event() fvp {}".format( - key, fvp)) - if port_change_event is not None: - port_change_event_handler(port_change_event) - - def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler): """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers """