From 42b394ddef625247d41dfb83fe29171715c4e857 Mon Sep 17 00:00:00 2001 From: arlakshm <55814491+arlakshm@users.noreply.github.com> Date: Thu, 29 Apr 2021 12:30:06 -0700 Subject: [PATCH] Add support for config_db subscribe and unsubscribe python apis (#481) The PR https://github.com/Azure/sonic-buildimage/pull/6920 adds the support for swsscommon in `hostcfgd`. The PR test failure are seen because the `config_db.subscribe` doesnt work with swsscommon. This PR adds the support for the following python APIs - `config_db.subscribe` - `config_db.unsubscribe` --- common/configdb.h | 7 +++++ tests/test_redis_ut.py | 58 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/common/configdb.h b/common/configdb.h index a71b73c8f..f8b77ee69 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -153,6 +153,13 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native handler = self.handlers[table] handler(table, key, data) + def subscribe(self, table, handler): + self.handlers[table] = handler + + def unsubscribe(self, table): + if table in self.handlers: + self.handlers.pop(table) + def set_entry(self, table, key, data): key = self.serialize_key(key) raw_data = self.typed_to_raw(data) diff --git a/tests/test_redis_ut.py b/tests/test_redis_ut.py index 262db75a0..7740532bf 100644 --- a/tests/test_redis_ut.py +++ b/tests/test_redis_ut.py @@ -437,3 +437,61 @@ def test_multidb_ConfigDBConnector(): SonicDBConfig.load_sonic_global_db_config(global_db_config) config_db = ConfigDBConnector(use_unix_socket_path=True, namespace='asic1') assert config_db.namespace == 'asic1' + + +def test_ConfigDBSubscribe(): + table_name = 'TEST_TABLE' + test_key = 'key1' + test_data = {'field1': 'value1'} + global output_data + global stop_listen_thread + output_data = "" + stop_listen_thread = False + + def test_handler(key, data): + global output_data + assert key == test_key + assert data == test_data + output_data = test_data['field1'] + + # the config_db.listen() is a blocking function so could not use that in the tests + # this function has similar logic with a way to exit the listen function + def listen_thread_func(config_db): + global stop_listen_thread + pubsub = config_db.get_redis_client(config_db.db_name).pubsub() + pubsub.psubscribe( + "__keyspace@{}__:*".format(config_db.get_dbid(config_db.db_name))) + time.sleep(2) + while True: + if stop_listen_thread: + break + item = pubsub.listen_message() + if 'type' in item and item['type'] == 'pmessage': + key = item['channel'].split(':', 1)[1] + try: + (table, row) = key.split(config_db.TABLE_NAME_SEPARATOR, 1) + if table in config_db.handlers: + client = config_db.get_redis_client(config_db.db_name) + data = config_db.raw_to_typed(client.hgetall(key)) + config_db._ConfigDBConnector__fire(table, row, data) + except ValueError: + pass # Ignore non table-formated redis entries + + config_db = ConfigDBConnector() + config_db.connect(wait_for_init=False) + client = config_db.get_redis_client(config_db.CONFIG_DB) + client.flushdb() + config_db.subscribe(table_name, lambda table, key, + data: test_handler(key, data)) + assert table_name in config_db.handlers + + thread = Thread(target=listen_thread_func, args=(config_db,)) + thread.start() + time.sleep(5) + config_db.set_entry(table_name, test_key, test_data) + stop_listen_thread = True + thread.join() + assert output_data == test_data['field1'] + + config_db.unsubscribe(table_name) + assert table_name not in config_db.handlers