Skip to content

Commit

Permalink
Add support for config_db subscribe and unsubscribe python apis (#481)
Browse files Browse the repository at this point in the history
The PR sonic-net/sonic-buildimage#6920 adds the support for swsscommon in `hostcfgd`. The PR test failure are seen because the `config_db.subscribe` doesnt work with swsscommon. This PR adds the support for the following python APIs
 - `config_db.subscribe`
 - `config_db.unsubscribe`
  • Loading branch information
arlakshm authored Apr 29, 2021
1 parent f3e1085 commit 42b394d
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
7 changes: 7 additions & 0 deletions common/configdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
handler = self.handlers[table]
handler(table, key, data)

def subscribe(self, table, handler):
self.handlers[table] = handler

def unsubscribe(self, table):
if table in self.handlers:
self.handlers.pop(table)

def set_entry(self, table, key, data):
key = self.serialize_key(key)
raw_data = self.typed_to_raw(data)
Expand Down
58 changes: 58 additions & 0 deletions tests/test_redis_ut.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,61 @@ def test_multidb_ConfigDBConnector():
SonicDBConfig.load_sonic_global_db_config(global_db_config)
config_db = ConfigDBConnector(use_unix_socket_path=True, namespace='asic1')
assert config_db.namespace == 'asic1'


def test_ConfigDBSubscribe():
table_name = 'TEST_TABLE'
test_key = 'key1'
test_data = {'field1': 'value1'}
global output_data
global stop_listen_thread
output_data = ""
stop_listen_thread = False

def test_handler(key, data):
global output_data
assert key == test_key
assert data == test_data
output_data = test_data['field1']

# the config_db.listen() is a blocking function so could not use that in the tests
# this function has similar logic with a way to exit the listen function
def listen_thread_func(config_db):
global stop_listen_thread
pubsub = config_db.get_redis_client(config_db.db_name).pubsub()
pubsub.psubscribe(
"__keyspace@{}__:*".format(config_db.get_dbid(config_db.db_name)))
time.sleep(2)
while True:
if stop_listen_thread:
break
item = pubsub.listen_message()
if 'type' in item and item['type'] == 'pmessage':
key = item['channel'].split(':', 1)[1]
try:
(table, row) = key.split(config_db.TABLE_NAME_SEPARATOR, 1)
if table in config_db.handlers:
client = config_db.get_redis_client(config_db.db_name)
data = config_db.raw_to_typed(client.hgetall(key))
config_db._ConfigDBConnector__fire(table, row, data)
except ValueError:
pass # Ignore non table-formated redis entries

config_db = ConfigDBConnector()
config_db.connect(wait_for_init=False)
client = config_db.get_redis_client(config_db.CONFIG_DB)
client.flushdb()
config_db.subscribe(table_name, lambda table, key,
data: test_handler(key, data))
assert table_name in config_db.handlers

thread = Thread(target=listen_thread_func, args=(config_db,))
thread.start()
time.sleep(5)
config_db.set_entry(table_name, test_key, test_data)
stop_listen_thread = True
thread.join()
assert output_data == test_data['field1']

config_db.unsubscribe(table_name)
assert table_name not in config_db.handlers

0 comments on commit 42b394d

Please sign in to comment.