diff --git a/common/Makefile.am b/common/Makefile.am index 7f39d5f1..5ea63f82 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -70,6 +70,7 @@ common_libswsscommon_la_SOURCES = \ common/profileprovider.cpp \ common/zmqclient.cpp \ common/zmqserver.cpp \ + common/asyncdbupdater.cpp \ common/redis_table_waiter.cpp common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS) diff --git a/common/asyncdbupdater.cpp b/common/asyncdbupdater.cpp new file mode 100644 index 00000000..5d659330 --- /dev/null +++ b/common/asyncdbupdater.cpp @@ -0,0 +1,114 @@ +#include +#include +#include +#include +#include +#include "asyncdbupdater.h" +#include "dbconnector.h" +#include "redisselect.h" +#include "redisapi.h" +#include "table.h" + +using namespace std; + +namespace swss { + +AsyncDBUpdater::AsyncDBUpdater(DBConnector *db, const std::string &tableName) + : m_db(db) + , m_tableName(tableName) +{ + m_runThread = true; + m_dbUpdateThread = std::make_shared(&AsyncDBUpdater::dbUpdateThread, this); + + SWSS_LOG_DEBUG("AsyncDBUpdater ctor tableName: %s", tableName.c_str()); +} + +AsyncDBUpdater::~AsyncDBUpdater() +{ + m_runThread = false; + + // notify db update thread exit + m_dbUpdateDataNotifyCv.notify_all(); + m_dbUpdateThread->join(); +} + +void AsyncDBUpdater::update(std::shared_ptr pkco) +{ + { + std::lock_guard lock(m_dbUpdateDataQueueMutex); + m_dbUpdateDataQueue.push(pkco); + } + + m_dbUpdateDataNotifyCv.notify_all(); +} + +void AsyncDBUpdater::dbUpdateThread() +{ + SWSS_LOG_ENTER(); + SWSS_LOG_NOTICE("dbUpdateThread begin"); + + // Different schedule policy has different min priority + pthread_attr_t attr; + int policy; + pthread_attr_getschedpolicy(&attr, &policy); + int min_priority = sched_get_priority_min(policy); + // Use min priority will block poll thread + pthread_setschedprio(pthread_self(), min_priority + 1); + + // Follow same logic in ConsumerStateTable: every received data will write to 'table'. + DBConnector db(m_db->getDbName(), 0, true); + Table table(&db, m_tableName); + std::mutex cvMutex; + std::unique_lock cvLock(cvMutex); + + while (m_runThread) + { + size_t count; + count = queueSize(); + if (count == 0) + { + // when queue is empty, wait notification, when data come, continue to check queue size again + m_dbUpdateDataNotifyCv.wait(cvLock); + continue; + } + + for (size_t ie = 0; ie < count; ie++) + { + auto& kco = *(m_dbUpdateDataQueue.front()); + + if (kfvOp(kco) == SET_COMMAND) + { + auto& values = kfvFieldsValues(kco); + + // Delete entry before Table::set(), because Table::set() does not remove the no longer existed fields from entry. + table.del(kfvKey(kco)); + table.set(kfvKey(kco), values); + } + else if (kfvOp(kco) == DEL_COMMAND) + { + table.del(kfvKey(kco)); + } + else + { + SWSS_LOG_ERROR("db: %s, table: %s receive unknown operation: %s", m_db->getDbName().c_str(), m_tableName.c_str(), kfvOp(kco).c_str()); + } + + { + std::lock_guard lock(m_dbUpdateDataQueueMutex); + m_dbUpdateDataQueue.pop(); + } + } + } + + SWSS_LOG_DEBUG("AsyncDBUpdater dbUpdateThread end: %s", m_tableName.c_str()); +} + +size_t AsyncDBUpdater::queueSize() +{ + // size() is not thread safe + std::lock_guard lock(m_dbUpdateDataQueueMutex); + + return m_dbUpdateDataQueue.size(); +} + +} diff --git a/common/asyncdbupdater.h b/common/asyncdbupdater.h new file mode 100644 index 00000000..4826661a --- /dev/null +++ b/common/asyncdbupdater.h @@ -0,0 +1,43 @@ +#pragma once + +#include +#include +#include +#include "dbconnector.h" +#include "table.h" + +#define MQ_RESPONSE_MAX_COUNT (4*1024*1024) +#define MQ_SIZE 100 +#define MQ_MAX_RETRY 10 +#define MQ_POLL_TIMEOUT (1000) + +namespace swss { + +class AsyncDBUpdater +{ +public: + AsyncDBUpdater(DBConnector *db, const std::string &tableName); + ~AsyncDBUpdater(); + + void update(std::shared_ptr pkco); + + size_t queueSize(); +private: + void dbUpdateThread(); + + volatile bool m_runThread; + + std::shared_ptr m_dbUpdateThread; + + std::mutex m_dbUpdateDataQueueMutex; + + std::condition_variable m_dbUpdateDataNotifyCv; + + std::queue> m_dbUpdateDataQueue; + + DBConnector *m_db; + + std::string m_tableName; +}; + +} diff --git a/common/zmqconsumerstatetable.cpp b/common/zmqconsumerstatetable.cpp index 217b7cf3..5795f1fa 100644 --- a/common/zmqconsumerstatetable.cpp +++ b/common/zmqconsumerstatetable.cpp @@ -26,13 +26,12 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string if (dbPersistence) { SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str()); - m_runThread = true; - m_dbUpdateThread = std::make_shared(&ZmqConsumerStateTable::dbUpdateThread, this); + m_asyncDBUpdater = std::make_unique(db, tableName); } else { SWSS_LOG_DEBUG("Database persistence disabled, tableName: %s", tableName.c_str()); - m_dbUpdateThread = nullptr; + m_asyncDBUpdater = nullptr; } m_zmqServer.registerMessageHandler(m_db->getDbName(), tableName, this); @@ -40,22 +39,10 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string SWSS_LOG_DEBUG("ZmqConsumerStateTable ctor tableName: %s", tableName.c_str()); } -ZmqConsumerStateTable::~ZmqConsumerStateTable() -{ - if (m_dbUpdateThread != nullptr) - { - m_runThread = false; - - // notify db update thread exit - m_dbUpdateDataNotifyCv.notify_all(); - m_dbUpdateThread->join(); - } -} - void ZmqConsumerStateTable::handleReceivedData(std::shared_ptr pkco) { std::shared_ptr clone = nullptr; - if (m_dbUpdateThread != nullptr) + if (m_asyncDBUpdater != nullptr) { // clone before put to received queue, because received data may change by consumer. clone = std::make_shared(*pkco); @@ -68,80 +55,9 @@ void ZmqConsumerStateTable::handleReceivedData(std::shared_ptr lock(m_dbUpdateDataQueueMutex); - m_dbUpdateDataQueue.push(clone); - } - - m_dbUpdateDataNotifyCv.notify_all(); - } -} - -void ZmqConsumerStateTable::dbUpdateThread() -{ - SWSS_LOG_ENTER(); - SWSS_LOG_NOTICE("dbUpdateThread begin"); - - // Different schedule policy has different min priority - pthread_attr_t attr; - int policy; - pthread_attr_getschedpolicy(&attr, &policy); - int min_priority = sched_get_priority_min(policy); - // Use min priority will block poll thread - pthread_setschedprio(pthread_self(), min_priority + 1); - - // Follow same logic in ConsumerStateTable: every received data will write to 'table'. - DBConnector db(m_db->getDbName(), 0, true); - Table table(&db, getTableName()); - std::mutex cvMutex; - std::unique_lock cvLock(cvMutex); - - while (m_runThread) - { - m_dbUpdateDataNotifyCv.wait(cvLock); - - size_t count; - { - // size() is not thread safe - std::lock_guard lock(m_dbUpdateDataQueueMutex); - - // For new data append to m_dataQueue during pops, will not be include in result. - count = m_dbUpdateDataQueue.size(); - if (!count) - { - continue; - } - - } - - for (size_t ie = 0; ie < count; ie++) - { - auto& kco = *(m_dbUpdateDataQueue.front()); - - if (kfvOp(kco) == SET_COMMAND) - { - auto& values = kfvFieldsValues(kco); - - // Delete entry before Table::set(), because Table::set() does not remove the no longer existed fields from entry. - table.del(kfvKey(kco)); - table.set(kfvKey(kco), values); - } - else if (kfvOp(kco) == DEL_COMMAND) - { - table.del(kfvKey(kco)); - } - else - { - SWSS_LOG_ERROR("zmq consumer table: %s, receive unknown operation: %s", getTableName().c_str(), kfvOp(kco).c_str()); - } - - { - std::lock_guard lock(m_dbUpdateDataQueueMutex); - m_dbUpdateDataQueue.pop(); - } - } + m_asyncDBUpdater->update(clone); } } @@ -174,4 +90,15 @@ void ZmqConsumerStateTable::pops(std::deque &vkco, const } } +size_t ZmqConsumerStateTable::dbUpdaterQueueSize() +{ + if (m_asyncDBUpdater == nullptr) + { + throw system_error(make_error_code(errc::operation_not_supported), + "Database persistence is not enabled"); + } + + return m_asyncDBUpdater->queueSize(); +} + } diff --git a/common/zmqconsumerstatetable.h b/common/zmqconsumerstatetable.h index e61a666a..d18e5dc2 100644 --- a/common/zmqconsumerstatetable.h +++ b/common/zmqconsumerstatetable.h @@ -3,17 +3,13 @@ #include #include #include -#include "dbconnector.h" -#include "table.h" +#include "asyncdbupdater.h" #include "consumertablebase.h" +#include "dbconnector.h" #include "selectableevent.h" +#include "table.h" #include "zmqserver.h" -#define MQ_RESPONSE_MAX_COUNT (4*1024*1024) -#define MQ_SIZE 100 -#define MQ_MAX_RETRY 10 -#define MQ_POLL_TIMEOUT (1000) - namespace swss { class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMessageHandler @@ -22,8 +18,7 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes /* The default value of pop batch size is 128 */ static constexpr int DEFAULT_POP_BATCH_SIZE = 128; - ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, ZmqServer &zmqServer, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0, bool dbPersistence = true); - ~ZmqConsumerStateTable(); + ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, ZmqServer &zmqServer, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0, bool dbPersistence = false); /* Get multiple pop elements */ void pops(std::deque &vkco, const std::string &prefix = EMPTY_PREFIX); @@ -72,30 +67,22 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes return m_db; } + size_t dbUpdaterQueueSize(); + private: void handleReceivedData(std::shared_ptr pkco); - void dbUpdateThread(); - - volatile bool m_runThread; - std::mutex m_receivedQueueMutex; std::queue> m_receivedOperationQueue; swss::SelectableEvent m_selectableEvent; - std::shared_ptr m_dbUpdateThread; - - std::mutex m_dbUpdateDataQueueMutex; - - std::condition_variable m_dbUpdateDataNotifyCv; - - std::queue> m_dbUpdateDataQueue; - DBConnector *m_db; ZmqServer& m_zmqServer; + + std::unique_ptr m_asyncDBUpdater; }; } diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index d959b920..b2afbb75 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -18,27 +18,38 @@ using namespace std; namespace swss { -ZmqProducerStateTable::ZmqProducerStateTable(DBConnector *db, const string &tableName, ZmqClient &zmqClient) +ZmqProducerStateTable::ZmqProducerStateTable(DBConnector *db, const string &tableName, ZmqClient &zmqClient, bool dbPersistence) : ProducerStateTable(db, tableName), m_zmqClient(zmqClient), m_dbName(db->getDbName()), m_tableNameStr(tableName) { - initialize(); + initialize(db, tableName, dbPersistence); } -ZmqProducerStateTable::ZmqProducerStateTable(RedisPipeline *pipeline, const string &tableName, ZmqClient &zmqClient, bool buffered) +ZmqProducerStateTable::ZmqProducerStateTable(RedisPipeline *pipeline, const string &tableName, ZmqClient &zmqClient, bool buffered, bool dbPersistence) : ProducerStateTable(pipeline, tableName, buffered), m_zmqClient(zmqClient), m_dbName(pipeline->getDbName()), m_tableNameStr(tableName) { - initialize(); + initialize(pipeline->getDBConnector(), tableName, dbPersistence); } -void ZmqProducerStateTable::initialize() +void ZmqProducerStateTable::initialize(DBConnector *db, const std::string &tableName, bool dbPersistence) { m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT); + + if (dbPersistence) + { + SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str()); + m_asyncDBUpdater = std::make_unique(db, tableName); + } + else + { + SWSS_LOG_DEBUG("Database persistence disabled, tableName: %s", tableName.c_str()); + m_asyncDBUpdater = nullptr; + } } void ZmqProducerStateTable::set( @@ -54,6 +65,20 @@ void ZmqProducerStateTable::set( m_dbName, m_tableNameStr, m_sendbuffer); + + if (m_asyncDBUpdater != nullptr) + { + // async write need keep data till write to DB + std::shared_ptr clone = std::make_shared(); + kfvKey(*clone) = key; + kfvOp(*clone) = op; + for(const auto &value : values) + { + kfvFieldsValues(*clone).push_back(value); + } + + m_asyncDBUpdater->update(clone); + } } void ZmqProducerStateTable::del( @@ -68,16 +93,39 @@ void ZmqProducerStateTable::del( m_dbName, m_tableNameStr, m_sendbuffer); + + if (m_asyncDBUpdater != nullptr) + { + // async write need keep data till write to DB + std::shared_ptr clone = std::make_shared(); + kfvKey(*clone) = key; + kfvOp(*clone) = op; + + m_asyncDBUpdater->update(clone); + } } void ZmqProducerStateTable::set(const std::vector &values) { for (const auto &value : values) { - set( - kfvKey(value), - kfvFieldsValues(value), - SET_COMMAND); + m_zmqClient.sendMsg( + kfvKey(value), + kfvFieldsValues(value), + SET_COMMAND, + m_dbName, + m_tableNameStr, + m_sendbuffer); + } + + if (m_asyncDBUpdater != nullptr) + { + for (const auto &value : values) + { + // async write need keep data till write to DB + std::shared_ptr clone = std::make_shared(value); + m_asyncDBUpdater->update(clone); + } } } @@ -85,8 +133,37 @@ void ZmqProducerStateTable::del(const std::vector &keys) { for (const auto &key : keys) { - del(key, DEL_COMMAND); + m_zmqClient.sendMsg( + key, + vector(), + DEL_COMMAND, + m_dbName, + m_tableNameStr, + m_sendbuffer); } + + if (m_asyncDBUpdater != nullptr) + { + for (const auto &key : keys) + { + // async write need keep data till write to DB + std::shared_ptr clone = std::make_shared(); + kfvKey(*clone) = key; + kfvOp(*clone) = DEL_COMMAND; + m_asyncDBUpdater->update(clone); + } + } +} + +size_t ZmqProducerStateTable::dbUpdaterQueueSize() +{ + if (m_asyncDBUpdater == nullptr) + { + throw system_error(make_error_code(errc::operation_not_supported), + "Database persistence is not enabled"); + } + + return m_asyncDBUpdater->queueSize(); } } diff --git a/common/zmqproducerstatetable.h b/common/zmqproducerstatetable.h index 34ea3d71..8c784d42 100644 --- a/common/zmqproducerstatetable.h +++ b/common/zmqproducerstatetable.h @@ -5,9 +5,10 @@ #include #include #include -#include "table.h" -#include "redispipeline.h" +#include "asyncdbupdater.h" #include "producerstatetable.h" +#include "redispipeline.h" +#include "table.h" #include "zmqclient.h" namespace swss { @@ -15,8 +16,8 @@ namespace swss { class ZmqProducerStateTable : public ProducerStateTable { public: - ZmqProducerStateTable(DBConnector *db, const std::string &tableName, ZmqClient &zmqClient); - ZmqProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, ZmqClient &zmqClient, bool buffered = false); + ZmqProducerStateTable(DBConnector *db, const std::string &tableName, ZmqClient &zmqClient, bool dbPersistence = true); + ZmqProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, ZmqClient &zmqClient, bool buffered = false, bool dbPersistence = true); /* Implements set() and del() commands using notification messages */ virtual void set(const std::string &key, @@ -33,8 +34,9 @@ class ZmqProducerStateTable : public ProducerStateTable virtual void del(const std::vector &keys); + size_t dbUpdaterQueueSize(); private: - void initialize(); + void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence); ZmqClient& m_zmqClient; @@ -42,6 +44,8 @@ class ZmqProducerStateTable : public ProducerStateTable const std::string m_dbName; const std::string m_tableNameStr; + + std::unique_ptr m_asyncDBUpdater; }; } diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 51e96a96..1e9d2254 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -54,11 +54,11 @@ static inline int readNumberAtEOL(const string& str) static bool allDataReceived = false; -static void producerWorker(string tableName, string endpoint) +static void producerWorker(string tableName, string endpoint, bool dbPersistence) { DBConnector db(TEST_DB, 0, true); ZmqClient client(endpoint); - ZmqProducerStateTable p(&db, tableName, client); + ZmqProducerStateTable p(&db, tableName, client, dbPersistence); cout << "Producer thread started: " << tableName << endl; for (int i = 0; i < NUMBER_OF_OPS; i++) @@ -109,11 +109,21 @@ static void producerWorker(string tableName, string endpoint) p.del(keys); } + // wait all data been received by consumer while (!allDataReceived) { sleep(1); } + if (dbPersistence) + { + // wait all persist data write to redis + while (p.dbUpdaterQueueSize() > 0) + { + sleep(1); + } + } + cout << "Producer thread ended: " << tableName << endl; } @@ -123,13 +133,13 @@ static int delCount = 0; static int batchSetCount = 0; static int batchDelCount = 0; -static void consumerWorker(string tableName, string endpoint) +static void consumerWorker(string tableName, string endpoint, bool dbPersistence) { cout << "Consumer thread started: " << tableName << endl; DBConnector db(TEST_DB, 0, true); ZmqServer server(endpoint); - ZmqConsumerStateTable c(&db, tableName, server); + ZmqConsumerStateTable c(&db, tableName, server, 128, 0, dbPersistence); Select cs; cs.addSelectable(&c); @@ -180,24 +190,42 @@ static void consumerWorker(string tableName, string endpoint) } allDataReceived = true; + + if (dbPersistence) + { + // wait all persist data write to redis + while (c.dbUpdaterQueueSize() > 0) + { + sleep(1); + } + } + cout << "Consumer thread ended: " << tableName << endl; } -TEST(ZmqConsumerStateTable, test) + +static void testMethod(bool producerPersistence) { std::string testTableName = "ZMQ_PROD_CONS_UT"; std::string pushEndpoint = "tcp://localhost:1234"; std::string pullEndpoint = "tcp://*:1234"; thread *producerThreads[NUMBER_OF_THREADS]; + // reset receive data counter + setCount = 0; + delCount = 0; + batchSetCount = 0; + batchDelCount = 0; + allDataReceived = false; + // start consumer first, SHM can only have 1 consumer per table. - thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint); + thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl; /* Starting the producer before the producer */ for (int i = 0; i < NUMBER_OF_THREADS; i++) { - producerThreads[i] = new thread(producerWorker, testTableName, pushEndpoint); + producerThreads[i] = new thread(producerWorker, testTableName, pushEndpoint, producerPersistence); } cout << "Done. Waiting for all job to finish " << NUMBER_OF_OPS << " jobs." << endl; @@ -215,6 +243,7 @@ TEST(ZmqConsumerStateTable, test) EXPECT_EQ(batchSetCount, NUMBER_OF_THREADS * NUMBER_OF_OPS * MAX_KEYS); EXPECT_EQ(batchDelCount, NUMBER_OF_THREADS * NUMBER_OF_OPS * MAX_KEYS); + // check presist data in redis DBConnector db(TEST_DB, 0, true); Table table(&db, testTableName); std::vector keys; @@ -237,3 +266,16 @@ TEST(ZmqConsumerStateTable, test) cout << endl << "Done." << endl; } + +TEST(ZmqConsumerStateTable, test) +{ + // test with persist by consumer + testMethod(false); +} + + +TEST(ZmqProducerStateTable, test) +{ + // test with persist by producer + testMethod(true); +} \ No newline at end of file