From 47c162505824ad7e81dd7f65578dbe7d534e4313 Mon Sep 17 00:00:00 2001 From: liuh-80 Date: Wed, 27 Sep 2023 06:33:28 +0000 Subject: [PATCH 1/6] Suppory async DB update for both ZMQ producer/consumer table. --- common/Makefile.am | 1 + common/asyncdbupdater.cpp | 111 +++++++++++++++++++++++++++++++ common/asyncdbupdater.h | 42 ++++++++++++ common/zmqconsumerstatetable.cpp | 94 ++------------------------ common/zmqconsumerstatetable.h | 22 ++---- common/zmqproducerstatetable.cpp | 86 +++++++++++++++++++++--- common/zmqproducerstatetable.h | 13 ++-- tests/zmq_state_ut.cpp | 33 +++++++-- 8 files changed, 276 insertions(+), 126 deletions(-) create mode 100644 common/asyncdbupdater.cpp create mode 100644 common/asyncdbupdater.h diff --git a/common/Makefile.am b/common/Makefile.am index 7f39d5f1..5ea63f82 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -70,6 +70,7 @@ common_libswsscommon_la_SOURCES = \ common/profileprovider.cpp \ common/zmqclient.cpp \ common/zmqserver.cpp \ + common/asyncdbupdater.cpp \ common/redis_table_waiter.cpp common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS) diff --git a/common/asyncdbupdater.cpp b/common/asyncdbupdater.cpp new file mode 100644 index 00000000..fad61db5 --- /dev/null +++ b/common/asyncdbupdater.cpp @@ -0,0 +1,111 @@ +#include +#include +#include +#include +#include +#include "asyncdbupdater.h" +#include "dbconnector.h" +#include "redisselect.h" +#include "redisapi.h" +#include "table.h" + +using namespace std; + +namespace swss { + +AsyncDBUpdater::AsyncDBUpdater(DBConnector *db, const std::string &tableName) + : m_db(db) + , m_tableName(tableName) +{ + m_runThread = true; + m_dbUpdateThread = std::make_shared(&AsyncDBUpdater::dbUpdateThread, this); + + SWSS_LOG_DEBUG("AsyncDBUpdater ctor tableName: %s", tableName.c_str()); +} + +AsyncDBUpdater::~AsyncDBUpdater() +{ + m_runThread = false; + + // notify db update thread exit + m_dbUpdateDataNotifyCv.notify_all(); + m_dbUpdateThread->join(); +} + +void AsyncDBUpdater::update(std::shared_ptr pkco) +{ + { + std::lock_guard lock(m_dbUpdateDataQueueMutex); + m_dbUpdateDataQueue.push(pkco); + } + + m_dbUpdateDataNotifyCv.notify_all(); +} + +void AsyncDBUpdater::dbUpdateThread() +{ + SWSS_LOG_ENTER(); + SWSS_LOG_NOTICE("dbUpdateThread begin"); + + // Different schedule policy has different min priority + pthread_attr_t attr; + int policy; + pthread_attr_getschedpolicy(&attr, &policy); + int min_priority = sched_get_priority_min(policy); + // Use min priority will block poll thread + pthread_setschedprio(pthread_self(), min_priority + 1); + + // Follow same logic in ConsumerStateTable: every received data will write to 'table'. + DBConnector db(m_db->getDbName(), 0, true); + Table table(&db, m_tableName); + std::mutex cvMutex; + std::unique_lock cvLock(cvMutex); + + while (m_runThread) + { + m_dbUpdateDataNotifyCv.wait(cvLock); + + size_t count; + { + // size() is not thread safe + std::lock_guard lock(m_dbUpdateDataQueueMutex); + + // For new data append to m_dataQueue during pops, will not be include in result. + count = m_dbUpdateDataQueue.size(); + if (!count) + { + continue; + } + + } + + for (size_t ie = 0; ie < count; ie++) + { + auto& kco = *(m_dbUpdateDataQueue.front()); + + if (kfvOp(kco) == SET_COMMAND) + { + auto& values = kfvFieldsValues(kco); + + // Delete entry before Table::set(), because Table::set() does not remove the no longer existed fields from entry. + table.del(kfvKey(kco)); + table.set(kfvKey(kco), values); + } + else if (kfvOp(kco) == DEL_COMMAND) + { + table.del(kfvKey(kco)); + } + else + { + SWSS_LOG_ERROR("db: %s, table: %s receive unknown operation: %s", m_db->getDbName().c_str(), m_tableName.c_str(), kfvOp(kco).c_str()); + } + + { + std::lock_guard lock(m_dbUpdateDataQueueMutex); + m_dbUpdateDataQueue.pop(); + } + } + } +} + +} diff --git a/common/asyncdbupdater.h b/common/asyncdbupdater.h new file mode 100644 index 00000000..31549559 --- /dev/null +++ b/common/asyncdbupdater.h @@ -0,0 +1,42 @@ +#pragma once + +#include +#include +#include +#include "dbconnector.h" +#include "table.h" + +#define MQ_RESPONSE_MAX_COUNT (4*1024*1024) +#define MQ_SIZE 100 +#define MQ_MAX_RETRY 10 +#define MQ_POLL_TIMEOUT (1000) + +namespace swss { + +class AsyncDBUpdater +{ +public: + AsyncDBUpdater(DBConnector *db, const std::string &tableName); + ~AsyncDBUpdater(); + + void update(std::shared_ptr pkco); + +private: + void dbUpdateThread(); + + volatile bool m_runThread; + + std::shared_ptr m_dbUpdateThread; + + std::mutex m_dbUpdateDataQueueMutex; + + std::condition_variable m_dbUpdateDataNotifyCv; + + std::queue> m_dbUpdateDataQueue; + + DBConnector *m_db; + + std::string m_tableName; +}; + +} diff --git a/common/zmqconsumerstatetable.cpp b/common/zmqconsumerstatetable.cpp index 217b7cf3..305999c7 100644 --- a/common/zmqconsumerstatetable.cpp +++ b/common/zmqconsumerstatetable.cpp @@ -26,13 +26,12 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string if (dbPersistence) { SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str()); - m_runThread = true; - m_dbUpdateThread = std::make_shared(&ZmqConsumerStateTable::dbUpdateThread, this); + m_asyncDBUpdater = std::make_unique(db, tableName); } else { SWSS_LOG_DEBUG("Database persistence disabled, tableName: %s", tableName.c_str()); - m_dbUpdateThread = nullptr; + m_asyncDBUpdater = nullptr; } m_zmqServer.registerMessageHandler(m_db->getDbName(), tableName, this); @@ -40,22 +39,10 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string SWSS_LOG_DEBUG("ZmqConsumerStateTable ctor tableName: %s", tableName.c_str()); } -ZmqConsumerStateTable::~ZmqConsumerStateTable() -{ - if (m_dbUpdateThread != nullptr) - { - m_runThread = false; - - // notify db update thread exit - m_dbUpdateDataNotifyCv.notify_all(); - m_dbUpdateThread->join(); - } -} - void ZmqConsumerStateTable::handleReceivedData(std::shared_ptr pkco) { std::shared_ptr clone = nullptr; - if (m_dbUpdateThread != nullptr) + if (m_asyncDBUpdater != nullptr) { // clone before put to received queue, because received data may change by consumer. clone = std::make_shared(*pkco); @@ -68,80 +55,9 @@ void ZmqConsumerStateTable::handleReceivedData(std::shared_ptr lock(m_dbUpdateDataQueueMutex); - m_dbUpdateDataQueue.push(clone); - } - - m_dbUpdateDataNotifyCv.notify_all(); - } -} - -void ZmqConsumerStateTable::dbUpdateThread() -{ - SWSS_LOG_ENTER(); - SWSS_LOG_NOTICE("dbUpdateThread begin"); - - // Different schedule policy has different min priority - pthread_attr_t attr; - int policy; - pthread_attr_getschedpolicy(&attr, &policy); - int min_priority = sched_get_priority_min(policy); - // Use min priority will block poll thread - pthread_setschedprio(pthread_self(), min_priority + 1); - - // Follow same logic in ConsumerStateTable: every received data will write to 'table'. - DBConnector db(m_db->getDbName(), 0, true); - Table table(&db, getTableName()); - std::mutex cvMutex; - std::unique_lock cvLock(cvMutex); - - while (m_runThread) - { - m_dbUpdateDataNotifyCv.wait(cvLock); - - size_t count; - { - // size() is not thread safe - std::lock_guard lock(m_dbUpdateDataQueueMutex); - - // For new data append to m_dataQueue during pops, will not be include in result. - count = m_dbUpdateDataQueue.size(); - if (!count) - { - continue; - } - - } - - for (size_t ie = 0; ie < count; ie++) - { - auto& kco = *(m_dbUpdateDataQueue.front()); - - if (kfvOp(kco) == SET_COMMAND) - { - auto& values = kfvFieldsValues(kco); - - // Delete entry before Table::set(), because Table::set() does not remove the no longer existed fields from entry. - table.del(kfvKey(kco)); - table.set(kfvKey(kco), values); - } - else if (kfvOp(kco) == DEL_COMMAND) - { - table.del(kfvKey(kco)); - } - else - { - SWSS_LOG_ERROR("zmq consumer table: %s, receive unknown operation: %s", getTableName().c_str(), kfvOp(kco).c_str()); - } - - { - std::lock_guard lock(m_dbUpdateDataQueueMutex); - m_dbUpdateDataQueue.pop(); - } - } + m_asyncDBUpdater->update(clone); } } diff --git a/common/zmqconsumerstatetable.h b/common/zmqconsumerstatetable.h index e61a666a..a00445cb 100644 --- a/common/zmqconsumerstatetable.h +++ b/common/zmqconsumerstatetable.h @@ -3,10 +3,11 @@ #include #include #include -#include "dbconnector.h" -#include "table.h" +#include "asyncdbupdater.h" #include "consumertablebase.h" +#include "dbconnector.h" #include "selectableevent.h" +#include "table.h" #include "zmqserver.h" #define MQ_RESPONSE_MAX_COUNT (4*1024*1024) @@ -22,8 +23,7 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes /* The default value of pop batch size is 128 */ static constexpr int DEFAULT_POP_BATCH_SIZE = 128; - ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, ZmqServer &zmqServer, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0, bool dbPersistence = true); - ~ZmqConsumerStateTable(); + ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, ZmqServer &zmqServer, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0, bool dbPersistence = false); /* Get multiple pop elements */ void pops(std::deque &vkco, const std::string &prefix = EMPTY_PREFIX); @@ -75,27 +75,17 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes private: void handleReceivedData(std::shared_ptr pkco); - void dbUpdateThread(); - - volatile bool m_runThread; - std::mutex m_receivedQueueMutex; std::queue> m_receivedOperationQueue; swss::SelectableEvent m_selectableEvent; - std::shared_ptr m_dbUpdateThread; - - std::mutex m_dbUpdateDataQueueMutex; - - std::condition_variable m_dbUpdateDataNotifyCv; - - std::queue> m_dbUpdateDataQueue; - DBConnector *m_db; ZmqServer& m_zmqServer; + + std::unique_ptr m_asyncDBUpdater; }; } diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index d959b920..ef8d541a 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -18,27 +18,38 @@ using namespace std; namespace swss { -ZmqProducerStateTable::ZmqProducerStateTable(DBConnector *db, const string &tableName, ZmqClient &zmqClient) +ZmqProducerStateTable::ZmqProducerStateTable(DBConnector *db, const string &tableName, ZmqClient &zmqClient, bool dbPersistence) : ProducerStateTable(db, tableName), m_zmqClient(zmqClient), m_dbName(db->getDbName()), m_tableNameStr(tableName) { - initialize(); + initialize(db, tableName, dbPersistence); } -ZmqProducerStateTable::ZmqProducerStateTable(RedisPipeline *pipeline, const string &tableName, ZmqClient &zmqClient, bool buffered) +ZmqProducerStateTable::ZmqProducerStateTable(RedisPipeline *pipeline, const string &tableName, ZmqClient &zmqClient, bool buffered, bool dbPersistence) : ProducerStateTable(pipeline, tableName, buffered), m_zmqClient(zmqClient), m_dbName(pipeline->getDbName()), m_tableNameStr(tableName) { - initialize(); + initialize(pipeline->getDBConnector(), tableName, dbPersistence); } -void ZmqProducerStateTable::initialize() +void ZmqProducerStateTable::initialize(DBConnector *db, const std::string &tableName, bool dbPersistence) { m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT); + + if (dbPersistence) + { + SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str()); + m_asyncDBUpdater = std::make_unique(db, tableName); + } + else + { + SWSS_LOG_DEBUG("Database persistence disabled, tableName: %s", tableName.c_str()); + m_asyncDBUpdater = nullptr; + } } void ZmqProducerStateTable::set( @@ -54,6 +65,20 @@ void ZmqProducerStateTable::set( m_dbName, m_tableNameStr, m_sendbuffer); + + if (m_asyncDBUpdater != nullptr) + { + // async write need keep data till write to DB + std::shared_ptr clone = std::make_shared(); + kfvKey(*clone) = key; + kfvOp(*clone) = op; + for(const auto &value : values) + { + kfvFieldsValues(*clone).push_back(value); + } + + m_asyncDBUpdater->update(clone); + } } void ZmqProducerStateTable::del( @@ -68,16 +93,39 @@ void ZmqProducerStateTable::del( m_dbName, m_tableNameStr, m_sendbuffer); + + if (m_asyncDBUpdater != nullptr) + { + // async write need keep data till write to DB + std::shared_ptr clone = std::make_shared(); + kfvKey(*clone) = key; + kfvOp(*clone) = op; + + m_asyncDBUpdater->update(clone); + } } void ZmqProducerStateTable::set(const std::vector &values) { for (const auto &value : values) { - set( - kfvKey(value), - kfvFieldsValues(value), - SET_COMMAND); + m_zmqClient.sendMsg( + kfvKey(value), + kfvFieldsValues(value), + SET_COMMAND, + m_dbName, + m_tableNameStr, + m_sendbuffer); + } + + if (m_asyncDBUpdater != nullptr) + { + for (const auto &value : values) + { + // async write need keep data till write to DB + std::shared_ptr clone = std::make_shared(value); + m_asyncDBUpdater->update(clone); + } } } @@ -85,7 +133,25 @@ void ZmqProducerStateTable::del(const std::vector &keys) { for (const auto &key : keys) { - del(key, DEL_COMMAND); + m_zmqClient.sendMsg( + key, + vector(), + DEL_COMMAND, + m_dbName, + m_tableNameStr, + m_sendbuffer); + } + + if (m_asyncDBUpdater != nullptr) + { + for (const auto &key : keys) + { + // async write need keep data till write to DB + std::shared_ptr clone = std::make_shared(); + kfvKey(*clone) = key; + kfvOp(*clone) = DEL_COMMAND; + m_asyncDBUpdater->update(clone); + } } } diff --git a/common/zmqproducerstatetable.h b/common/zmqproducerstatetable.h index 34ea3d71..405e27d2 100644 --- a/common/zmqproducerstatetable.h +++ b/common/zmqproducerstatetable.h @@ -5,9 +5,10 @@ #include #include #include -#include "table.h" -#include "redispipeline.h" +#include "asyncdbupdater.h" #include "producerstatetable.h" +#include "redispipeline.h" +#include "table.h" #include "zmqclient.h" namespace swss { @@ -15,8 +16,8 @@ namespace swss { class ZmqProducerStateTable : public ProducerStateTable { public: - ZmqProducerStateTable(DBConnector *db, const std::string &tableName, ZmqClient &zmqClient); - ZmqProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, ZmqClient &zmqClient, bool buffered = false); + ZmqProducerStateTable(DBConnector *db, const std::string &tableName, ZmqClient &zmqClient, bool dbPersistence = true); + ZmqProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, ZmqClient &zmqClient, bool buffered = false, bool dbPersistence = true); /* Implements set() and del() commands using notification messages */ virtual void set(const std::string &key, @@ -34,7 +35,7 @@ class ZmqProducerStateTable : public ProducerStateTable virtual void del(const std::vector &keys); private: - void initialize(); + void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence); ZmqClient& m_zmqClient; @@ -42,6 +43,8 @@ class ZmqProducerStateTable : public ProducerStateTable const std::string m_dbName; const std::string m_tableNameStr; + + std::unique_ptr m_asyncDBUpdater; }; } diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 51e96a96..68dd8bb0 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -54,11 +54,11 @@ static inline int readNumberAtEOL(const string& str) static bool allDataReceived = false; -static void producerWorker(string tableName, string endpoint) +static void producerWorker(string tableName, string endpoint, bool dbPersistence) { DBConnector db(TEST_DB, 0, true); ZmqClient client(endpoint); - ZmqProducerStateTable p(&db, tableName, client); + ZmqProducerStateTable p(&db, tableName, client, false); cout << "Producer thread started: " << tableName << endl; for (int i = 0; i < NUMBER_OF_OPS; i++) @@ -123,7 +123,7 @@ static int delCount = 0; static int batchSetCount = 0; static int batchDelCount = 0; -static void consumerWorker(string tableName, string endpoint) +static void consumerWorker(string tableName, string endpoint, bool dbPersistence) { cout << "Consumer thread started: " << tableName << endl; @@ -183,21 +183,28 @@ static void consumerWorker(string tableName, string endpoint) cout << "Consumer thread ended: " << tableName << endl; } -TEST(ZmqConsumerStateTable, test) + +static void testMethod(bool producerPersistence) { std::string testTableName = "ZMQ_PROD_CONS_UT"; std::string pushEndpoint = "tcp://localhost:1234"; std::string pullEndpoint = "tcp://*:1234"; thread *producerThreads[NUMBER_OF_THREADS]; + // reset receive data counter + setCount = 0; + delCount = 0; + batchSetCount = 0; + batchDelCount = 0; + // start consumer first, SHM can only have 1 consumer per table. - thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint); + thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl; /* Starting the producer before the producer */ for (int i = 0; i < NUMBER_OF_THREADS; i++) { - producerThreads[i] = new thread(producerWorker, testTableName, pushEndpoint); + producerThreads[i] = new thread(producerWorker, testTableName, pushEndpoint, producerPersistence); } cout << "Done. Waiting for all job to finish " << NUMBER_OF_OPS << " jobs." << endl; @@ -215,6 +222,7 @@ TEST(ZmqConsumerStateTable, test) EXPECT_EQ(batchSetCount, NUMBER_OF_THREADS * NUMBER_OF_OPS * MAX_KEYS); EXPECT_EQ(batchDelCount, NUMBER_OF_THREADS * NUMBER_OF_OPS * MAX_KEYS); + // check presist data in redis DBConnector db(TEST_DB, 0, true); Table table(&db, testTableName); std::vector keys; @@ -237,3 +245,16 @@ TEST(ZmqConsumerStateTable, test) cout << endl << "Done." << endl; } + +TEST(ZmqConsumerStateTable, test) +{ + // test with persist by consumer + testMethod(false); +} + + +TEST(ZmqProducerStateTable, test) +{ + // test with persist by producer + testMethod(true); +} \ No newline at end of file From 6db388632f52ebec0da171c72caaa11a234c6e99 Mon Sep 17 00:00:00 2001 From: liuh-80 Date: Wed, 27 Sep 2023 07:05:22 +0000 Subject: [PATCH 2/6] Fix UT --- tests/zmq_state_ut.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 68dd8bb0..3476b646 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -58,7 +58,7 @@ static void producerWorker(string tableName, string endpoint, bool dbPersistence { DBConnector db(TEST_DB, 0, true); ZmqClient client(endpoint); - ZmqProducerStateTable p(&db, tableName, client, false); + ZmqProducerStateTable p(&db, tableName, client, dbPersistence); cout << "Producer thread started: " << tableName << endl; for (int i = 0; i < NUMBER_OF_OPS; i++) @@ -129,7 +129,7 @@ static void consumerWorker(string tableName, string endpoint, bool dbPersistence DBConnector db(TEST_DB, 0, true); ZmqServer server(endpoint); - ZmqConsumerStateTable c(&db, tableName, server); + ZmqConsumerStateTable c(&db, tableName, server, dbPersistence); Select cs; cs.addSelectable(&c); From 12fda2281238e94ed432ffb459f58bda85aec0b8 Mon Sep 17 00:00:00 2001 From: liuh-80 Date: Wed, 27 Sep 2023 09:30:30 +0000 Subject: [PATCH 3/6] Improve UT and code --- common/asyncdbupdater.cpp | 15 +++++++++------ common/asyncdbupdater.h | 1 + common/zmqconsumerstatetable.cpp | 10 ++++++++++ common/zmqconsumerstatetable.h | 2 ++ common/zmqproducerstatetable.cpp | 10 ++++++++++ common/zmqproducerstatetable.h | 1 + tests/zmq_state_ut.cpp | 11 +++++++++-- 7 files changed, 42 insertions(+), 8 deletions(-) diff --git a/common/asyncdbupdater.cpp b/common/asyncdbupdater.cpp index fad61db5..ee7e9c65 100644 --- a/common/asyncdbupdater.cpp +++ b/common/asyncdbupdater.cpp @@ -67,16 +67,11 @@ void AsyncDBUpdater::dbUpdateThread() size_t count; { - // size() is not thread safe - std::lock_guard lock(m_dbUpdateDataQueueMutex); - - // For new data append to m_dataQueue during pops, will not be include in result. - count = m_dbUpdateDataQueue.size(); + count = queueSize(); if (!count) { continue; } - } for (size_t ie = 0; ie < count; ie++) @@ -108,4 +103,12 @@ void AsyncDBUpdater::dbUpdateThread() } } +size_t AsyncDBUpdater::queueSize() +{ + // size() is not thread safe + std::lock_guard lock(m_dbUpdateDataQueueMutex); + + return m_dbUpdateDataQueue.size(); +} + } diff --git a/common/asyncdbupdater.h b/common/asyncdbupdater.h index 31549559..4826661a 100644 --- a/common/asyncdbupdater.h +++ b/common/asyncdbupdater.h @@ -21,6 +21,7 @@ class AsyncDBUpdater void update(std::shared_ptr pkco); + size_t queueSize(); private: void dbUpdateThread(); diff --git a/common/zmqconsumerstatetable.cpp b/common/zmqconsumerstatetable.cpp index 305999c7..c960882d 100644 --- a/common/zmqconsumerstatetable.cpp +++ b/common/zmqconsumerstatetable.cpp @@ -90,4 +90,14 @@ void ZmqConsumerStateTable::pops(std::deque &vkco, const } } +size_t ZmqConsumerStateTable::dbUpdaterQueueSize() +{ + if (m_asyncDBUpdater == nullptr) + { + return 0; + } + + return m_asyncDBUpdater->queueSize(); +} + } diff --git a/common/zmqconsumerstatetable.h b/common/zmqconsumerstatetable.h index a00445cb..1b0effcc 100644 --- a/common/zmqconsumerstatetable.h +++ b/common/zmqconsumerstatetable.h @@ -72,6 +72,8 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes return m_db; } + size_t dbUpdaterQueueSize(); + private: void handleReceivedData(std::shared_ptr pkco); diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index ef8d541a..be623ce9 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -155,4 +155,14 @@ void ZmqProducerStateTable::del(const std::vector &keys) } } +size_t ZmqProducerStateTable::dbUpdaterQueueSize() +{ + if (m_asyncDBUpdater == nullptr) + { + return 0; + } + + return m_asyncDBUpdater->queueSize(); +} + } diff --git a/common/zmqproducerstatetable.h b/common/zmqproducerstatetable.h index 405e27d2..8c784d42 100644 --- a/common/zmqproducerstatetable.h +++ b/common/zmqproducerstatetable.h @@ -34,6 +34,7 @@ class ZmqProducerStateTable : public ProducerStateTable virtual void del(const std::vector &keys); + size_t dbUpdaterQueueSize(); private: void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence); diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 3476b646..80af1e98 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -109,7 +109,8 @@ static void producerWorker(string tableName, string endpoint, bool dbPersistence p.del(keys); } - while (!allDataReceived) + // wait all data been received by consumer and all persist data write to redis + while (!allDataReceived && (p.dbUpdaterQueueSize() > 0)) { sleep(1); } @@ -129,7 +130,7 @@ static void consumerWorker(string tableName, string endpoint, bool dbPersistence DBConnector db(TEST_DB, 0, true); ZmqServer server(endpoint); - ZmqConsumerStateTable c(&db, tableName, server, dbPersistence); + ZmqConsumerStateTable c(&db, tableName, server, 128, 0, dbPersistence); Select cs; cs.addSelectable(&c); @@ -179,6 +180,12 @@ static void consumerWorker(string tableName, string endpoint, bool dbPersistence } } + // wait all persist data write to redis + while (c.dbUpdaterQueueSize() > 0) + { + sleep(1); + } + allDataReceived = true; cout << "Consumer thread ended: " << tableName << endl; } From 46c0ba90f0ea8365e96a00a96b885dc71e30a85e Mon Sep 17 00:00:00 2001 From: liuh-80 Date: Wed, 27 Sep 2023 10:17:59 +0000 Subject: [PATCH 4/6] Improve code and UT --- common/asyncdbupdater.cpp | 14 +++++++------- tests/zmq_state_ut.cpp | 26 ++++++++++++++++++++------ 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/common/asyncdbupdater.cpp b/common/asyncdbupdater.cpp index ee7e9c65..5d659330 100644 --- a/common/asyncdbupdater.cpp +++ b/common/asyncdbupdater.cpp @@ -63,15 +63,13 @@ void AsyncDBUpdater::dbUpdateThread() while (m_runThread) { - m_dbUpdateDataNotifyCv.wait(cvLock); - size_t count; + count = queueSize(); + if (count == 0) { - count = queueSize(); - if (!count) - { - continue; - } + // when queue is empty, wait notification, when data come, continue to check queue size again + m_dbUpdateDataNotifyCv.wait(cvLock); + continue; } for (size_t ie = 0; ie < count; ie++) @@ -101,6 +99,8 @@ void AsyncDBUpdater::dbUpdateThread() } } } + + SWSS_LOG_DEBUG("AsyncDBUpdater dbUpdateThread end: %s", m_tableName.c_str()); } size_t AsyncDBUpdater::queueSize() diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 80af1e98..1e9d2254 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -109,12 +109,21 @@ static void producerWorker(string tableName, string endpoint, bool dbPersistence p.del(keys); } - // wait all data been received by consumer and all persist data write to redis - while (!allDataReceived && (p.dbUpdaterQueueSize() > 0)) + // wait all data been received by consumer + while (!allDataReceived) { sleep(1); } + if (dbPersistence) + { + // wait all persist data write to redis + while (p.dbUpdaterQueueSize() > 0) + { + sleep(1); + } + } + cout << "Producer thread ended: " << tableName << endl; } @@ -180,13 +189,17 @@ static void consumerWorker(string tableName, string endpoint, bool dbPersistence } } - // wait all persist data write to redis - while (c.dbUpdaterQueueSize() > 0) + allDataReceived = true; + + if (dbPersistence) { - sleep(1); + // wait all persist data write to redis + while (c.dbUpdaterQueueSize() > 0) + { + sleep(1); + } } - allDataReceived = true; cout << "Consumer thread ended: " << tableName << endl; } @@ -203,6 +216,7 @@ static void testMethod(bool producerPersistence) delCount = 0; batchSetCount = 0; batchDelCount = 0; + allDataReceived = false; // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); From 86d7b9514904572bec8f9c69a185c0c1cbd73cba Mon Sep 17 00:00:00 2001 From: liuh-80 Date: Sat, 7 Oct 2023 02:39:10 +0000 Subject: [PATCH 5/6] Fix PR comments --- common/zmqconsumerstatetable.cpp | 3 ++- common/zmqproducerstatetable.cpp | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/common/zmqconsumerstatetable.cpp b/common/zmqconsumerstatetable.cpp index c960882d..5795f1fa 100644 --- a/common/zmqconsumerstatetable.cpp +++ b/common/zmqconsumerstatetable.cpp @@ -94,7 +94,8 @@ size_t ZmqConsumerStateTable::dbUpdaterQueueSize() { if (m_asyncDBUpdater == nullptr) { - return 0; + throw system_error(make_error_code(errc::operation_not_supported), + "Database persistence is not enabled"); } return m_asyncDBUpdater->queueSize(); diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index be623ce9..b2afbb75 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -159,7 +159,8 @@ size_t ZmqProducerStateTable::dbUpdaterQueueSize() { if (m_asyncDBUpdater == nullptr) { - return 0; + throw system_error(make_error_code(errc::operation_not_supported), + "Database persistence is not enabled"); } return m_asyncDBUpdater->queueSize(); From 0281a961e8f37dce96412d31351c5b050fa7a1ce Mon Sep 17 00:00:00 2001 From: liuh-80 Date: Tue, 10 Oct 2023 06:06:06 +0000 Subject: [PATCH 6/6] Fix PR comments --- common/zmqconsumerstatetable.h | 5 ----- 1 file changed, 5 deletions(-) diff --git a/common/zmqconsumerstatetable.h b/common/zmqconsumerstatetable.h index 1b0effcc..d18e5dc2 100644 --- a/common/zmqconsumerstatetable.h +++ b/common/zmqconsumerstatetable.h @@ -10,11 +10,6 @@ #include "table.h" #include "zmqserver.h" -#define MQ_RESPONSE_MAX_COUNT (4*1024*1024) -#define MQ_SIZE 100 -#define MQ_MAX_RETRY 10 -#define MQ_POLL_TIMEOUT (1000) - namespace swss { class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMessageHandler