Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async DB update for both ZMQ producer&consumer table. #821

Merged
merged 6 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ common_libswsscommon_la_SOURCES = \
common/profileprovider.cpp \
common/zmqclient.cpp \
common/zmqserver.cpp \
common/asyncdbupdater.cpp \
common/redis_table_waiter.cpp

common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS)
Expand Down
114 changes: 114 additions & 0 deletions common/asyncdbupdater.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#include <string>
#include <deque>
#include <limits>
#include <hiredis/hiredis.h>
#include <pthread.h>
#include "asyncdbupdater.h"
#include "dbconnector.h"
#include "redisselect.h"
#include "redisapi.h"
#include "table.h"

using namespace std;

namespace swss {

AsyncDBUpdater::AsyncDBUpdater(DBConnector *db, const std::string &tableName)
: m_db(db)
, m_tableName(tableName)
{
m_runThread = true;
m_dbUpdateThread = std::make_shared<std::thread>(&AsyncDBUpdater::dbUpdateThread, this);

SWSS_LOG_DEBUG("AsyncDBUpdater ctor tableName: %s", tableName.c_str());
}

AsyncDBUpdater::~AsyncDBUpdater()
{
m_runThread = false;

// notify db update thread exit
m_dbUpdateDataNotifyCv.notify_all();
m_dbUpdateThread->join();
}

void AsyncDBUpdater::update(std::shared_ptr<KeyOpFieldsValuesTuple> pkco)
{
{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.push(pkco);
}

m_dbUpdateDataNotifyCv.notify_all();
}

void AsyncDBUpdater::dbUpdateThread()
{
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("dbUpdateThread begin");

// Different schedule policy has different min priority
pthread_attr_t attr;
int policy;
pthread_attr_getschedpolicy(&attr, &policy);
int min_priority = sched_get_priority_min(policy);
// Use min priority will block poll thread
pthread_setschedprio(pthread_self(), min_priority + 1);

// Follow same logic in ConsumerStateTable: every received data will write to 'table'.
DBConnector db(m_db->getDbName(), 0, true);
Table table(&db, m_tableName);
std::mutex cvMutex;
std::unique_lock<std::mutex> cvLock(cvMutex);

while (m_runThread)
{
size_t count;
count = queueSize();
if (count == 0)
{
// when queue is empty, wait notification, when data come, continue to check queue size again
m_dbUpdateDataNotifyCv.wait(cvLock);
continue;
}

for (size_t ie = 0; ie < count; ie++)
{
auto& kco = *(m_dbUpdateDataQueue.front());

if (kfvOp(kco) == SET_COMMAND)
{
auto& values = kfvFieldsValues(kco);

// Delete entry before Table::set(), because Table::set() does not remove the no longer existed fields from entry.
table.del(kfvKey(kco));
table.set(kfvKey(kco), values);
}
else if (kfvOp(kco) == DEL_COMMAND)
{
table.del(kfvKey(kco));
}
else
{
SWSS_LOG_ERROR("db: %s, table: %s receive unknown operation: %s", m_db->getDbName().c_str(), m_tableName.c_str(), kfvOp(kco).c_str());
}

{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.pop();
}
}
}

SWSS_LOG_DEBUG("AsyncDBUpdater dbUpdateThread end: %s", m_tableName.c_str());
}

size_t AsyncDBUpdater::queueSize()
{
// size() is not thread safe
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);

return m_dbUpdateDataQueue.size();
}

}
43 changes: 43 additions & 0 deletions common/asyncdbupdater.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#include <string>
#include <deque>
#include <condition_variable>
#include "dbconnector.h"
#include "table.h"

#define MQ_RESPONSE_MAX_COUNT (4*1024*1024)
#define MQ_SIZE 100
#define MQ_MAX_RETRY 10
#define MQ_POLL_TIMEOUT (1000)

namespace swss {

class AsyncDBUpdater
{
public:
AsyncDBUpdater(DBConnector *db, const std::string &tableName);
~AsyncDBUpdater();

void update(std::shared_ptr<KeyOpFieldsValuesTuple> pkco);

size_t queueSize();
private:
void dbUpdateThread();

volatile bool m_runThread;

std::shared_ptr<std::thread> m_dbUpdateThread;

std::mutex m_dbUpdateDataQueueMutex;

std::condition_variable m_dbUpdateDataNotifyCv;

std::queue<std::shared_ptr<KeyOpFieldsValuesTuple>> m_dbUpdateDataQueue;

DBConnector *m_db;

std::string m_tableName;
};

}
105 changes: 16 additions & 89 deletions common/zmqconsumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,23 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string
if (dbPersistence)
{
SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str());
m_runThread = true;
m_dbUpdateThread = std::make_shared<std::thread>(&ZmqConsumerStateTable::dbUpdateThread, this);
m_asyncDBUpdater = std::make_unique<AsyncDBUpdater>(db, tableName);
}
else
{
SWSS_LOG_DEBUG("Database persistence disabled, tableName: %s", tableName.c_str());
m_dbUpdateThread = nullptr;
m_asyncDBUpdater = nullptr;
}

m_zmqServer.registerMessageHandler(m_db->getDbName(), tableName, this);

SWSS_LOG_DEBUG("ZmqConsumerStateTable ctor tableName: %s", tableName.c_str());
}

ZmqConsumerStateTable::~ZmqConsumerStateTable()
{
if (m_dbUpdateThread != nullptr)
{
m_runThread = false;

// notify db update thread exit
m_dbUpdateDataNotifyCv.notify_all();
m_dbUpdateThread->join();
}
}

void ZmqConsumerStateTable::handleReceivedData(std::shared_ptr<KeyOpFieldsValuesTuple> pkco)
{
std::shared_ptr<KeyOpFieldsValuesTuple> clone = nullptr;
if (m_dbUpdateThread != nullptr)
if (m_asyncDBUpdater != nullptr)
{
// clone before put to received queue, because received data may change by consumer.
clone = std::make_shared<KeyOpFieldsValuesTuple>(*pkco);
Expand All @@ -68,80 +55,9 @@ void ZmqConsumerStateTable::handleReceivedData(std::shared_ptr<KeyOpFieldsValues

m_selectableEvent.notify(); // will release epoll

if (m_dbUpdateThread != nullptr)
if (m_asyncDBUpdater != nullptr)
{
{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.push(clone);
}

m_dbUpdateDataNotifyCv.notify_all();
}
}

void ZmqConsumerStateTable::dbUpdateThread()
{
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("dbUpdateThread begin");

// Different schedule policy has different min priority
pthread_attr_t attr;
int policy;
pthread_attr_getschedpolicy(&attr, &policy);
int min_priority = sched_get_priority_min(policy);
// Use min priority will block poll thread
pthread_setschedprio(pthread_self(), min_priority + 1);

// Follow same logic in ConsumerStateTable: every received data will write to 'table'.
DBConnector db(m_db->getDbName(), 0, true);
Table table(&db, getTableName());
std::mutex cvMutex;
std::unique_lock<std::mutex> cvLock(cvMutex);

while (m_runThread)
{
m_dbUpdateDataNotifyCv.wait(cvLock);

size_t count;
{
// size() is not thread safe
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);

// For new data append to m_dataQueue during pops, will not be include in result.
count = m_dbUpdateDataQueue.size();
if (!count)
{
continue;
}

}

for (size_t ie = 0; ie < count; ie++)
{
auto& kco = *(m_dbUpdateDataQueue.front());

if (kfvOp(kco) == SET_COMMAND)
{
auto& values = kfvFieldsValues(kco);

// Delete entry before Table::set(), because Table::set() does not remove the no longer existed fields from entry.
table.del(kfvKey(kco));
table.set(kfvKey(kco), values);
}
else if (kfvOp(kco) == DEL_COMMAND)
{
table.del(kfvKey(kco));
}
else
{
SWSS_LOG_ERROR("zmq consumer table: %s, receive unknown operation: %s", getTableName().c_str(), kfvOp(kco).c_str());
}

{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.pop();
}
}
m_asyncDBUpdater->update(clone);
}
}

Expand Down Expand Up @@ -174,4 +90,15 @@ void ZmqConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const
}
}

size_t ZmqConsumerStateTable::dbUpdaterQueueSize()
{
if (m_asyncDBUpdater == nullptr)
{
throw system_error(make_error_code(errc::operation_not_supported),
"Database persistence is not enabled");
}

return m_asyncDBUpdater->queueSize();
}

}
24 changes: 8 additions & 16 deletions common/zmqconsumerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
#include <string>
#include <deque>
#include <condition_variable>
#include "dbconnector.h"
#include "table.h"
#include "asyncdbupdater.h"
#include "consumertablebase.h"
#include "dbconnector.h"
#include "selectableevent.h"
#include "table.h"
#include "zmqserver.h"

#define MQ_RESPONSE_MAX_COUNT (4*1024*1024)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This MACRO has been defined in asyncdbupdater.h, why redefine it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, remove these dupe macro

Expand All @@ -22,8 +23,7 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes
/* The default value of pop batch size is 128 */
static constexpr int DEFAULT_POP_BATCH_SIZE = 128;

ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, ZmqServer &zmqServer, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0, bool dbPersistence = true);
~ZmqConsumerStateTable();
ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, ZmqServer &zmqServer, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0, bool dbPersistence = false);

/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);
Expand Down Expand Up @@ -72,30 +72,22 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes
return m_db;
}

size_t dbUpdaterQueueSize();

private:
void handleReceivedData(std::shared_ptr<KeyOpFieldsValuesTuple> pkco);

void dbUpdateThread();

volatile bool m_runThread;

std::mutex m_receivedQueueMutex;

std::queue<std::shared_ptr<KeyOpFieldsValuesTuple>> m_receivedOperationQueue;

swss::SelectableEvent m_selectableEvent;

std::shared_ptr<std::thread> m_dbUpdateThread;

std::mutex m_dbUpdateDataQueueMutex;

std::condition_variable m_dbUpdateDataNotifyCv;

std::queue<std::shared_ptr<KeyOpFieldsValuesTuple>> m_dbUpdateDataQueue;

DBConnector *m_db;

ZmqServer& m_zmqServer;

std::unique_ptr<AsyncDBUpdater> m_asyncDBUpdater;
};

}
Loading
Loading