Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…wss-common into zmq
  • Loading branch information
divyagayathri-hcl committed Nov 26, 2024
2 parents 5caad2d + 6bac82b commit c19af49
Show file tree
Hide file tree
Showing 50 changed files with 1,996 additions and 153 deletions.
13 changes: 12 additions & 1 deletion common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,18 @@ common_libswsscommon_la_SOURCES = \
common/zmqclient.cpp \
common/zmqserver.cpp \
common/asyncdbupdater.cpp \
common/redis_table_waiter.cpp
common/redis_table_waiter.cpp \
common/interface.h \
common/c-api/util.cpp \
common/c-api/dbconnector.cpp \
common/c-api/consumerstatetable.cpp \
common/c-api/producerstatetable.cpp \
common/c-api/subscriberstatetable.cpp \
common/c-api/zmqclient.cpp \
common/c-api/zmqserver.cpp \
common/c-api/zmqconsumerstatetable.cpp \
common/c-api/zmqproducerstatetable.cpp \
common/performancetimer.cpp

common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS)
common_libswsscommon_la_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CPPFLAGS) $(CODE_COVERAGE_CPPFLAGS)
Expand Down
17 changes: 16 additions & 1 deletion common/asyncdbupdater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ AsyncDBUpdater::~AsyncDBUpdater()
// notify db update thread exit
m_dbUpdateDataNotifyCv.notify_all();
m_dbUpdateThread->join();
SWSS_LOG_DEBUG("AsyncDBUpdater dtor tableName: %s", m_tableName.c_str());
}

void AsyncDBUpdater::update(std::shared_ptr<KeyOpFieldsValuesTuple> pkco)
Expand Down Expand Up @@ -61,16 +62,30 @@ void AsyncDBUpdater::dbUpdateThread()
std::mutex cvMutex;
std::unique_lock<std::mutex> cvLock(cvMutex);

while (m_runThread)
while (true)
{
size_t count;
count = queueSize();
if (count == 0)
{
// Check if there still data in queue before exit
if (!m_runThread)
{
SWSS_LOG_NOTICE("dbUpdateThread for table: %s is exiting", m_tableName.c_str());
break;
}

// when queue is empty, wait notification, when data come, continue to check queue size again
m_dbUpdateDataNotifyCv.wait(cvLock);
continue;
}
else
{
if (!m_runThread)
{
SWSS_LOG_DEBUG("dbUpdateThread for table: %s still has %d records that need to be sent before exiting", m_tableName.c_str(), (int)count);
}
}

for (size_t ie = 0; ie < count; ie++)
{
Expand Down
26 changes: 24 additions & 2 deletions common/binaryserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define __BINARY_SERIALIZER__

#include "common/armhelper.h"
#include "common/rediscommand.h"
#include "common/table.h"

#include <string>

Expand All @@ -11,6 +13,26 @@ namespace swss {

class BinarySerializer {
public:
static size_t serializedSize(const string &dbName, const string &tableName,
const vector<KeyOpFieldsValuesTuple> &kcos) {
size_t n = 0;
n += dbName.size() + sizeof(size_t);
n += tableName.size() + sizeof(size_t);

for (const KeyOpFieldsValuesTuple &kco : kcos) {
const vector<FieldValueTuple> &fvs = kfvFieldsValues(kco);
n += kfvKey(kco).size() + sizeof(size_t);
n += to_string(fvs.size()).size() + sizeof(size_t);

for (const FieldValueTuple &fv : fvs) {
n += fvField(fv).size() + sizeof(size_t);
n += fvValue(fv).size() + sizeof(size_t);
}
}

return n + sizeof(size_t);
}

static size_t serializeBuffer(
const char* buffer,
const size_t size,
Expand Down Expand Up @@ -192,8 +214,8 @@ class BinarySerializer {
{
if ((size_t)(m_current_position - m_buffer + datalen + sizeof(size_t)) > m_buffer_size)
{
SWSS_LOG_THROW("There are not enough buffer for binary serializer to serialize,\
key count: %zu, data length %zu, buffer size: %zu",
SWSS_LOG_THROW("There are not enough buffer for binary serializer to serialize,\n"
" key count: %zu, data length %zu, buffer size: %zu",
m_kvp_count,
datalen,
m_buffer_size);
Expand Down
45 changes: 45 additions & 0 deletions common/c-api/consumerstatetable.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include <boost/numeric/conversion/cast.hpp>
#include <cstdlib>
#include <cstring>
#include <deque>

#include "../consumerstatetable.h"
#include "../dbconnector.h"
#include "../table.h"
#include "consumerstatetable.h"
#include "util.h"

using namespace swss;
using namespace std;
using boost::numeric_cast;

SWSSConsumerStateTable SWSSConsumerStateTable_new(SWSSDBConnector db, const char *tableName,
const int32_t *p_popBatchSize,
const int32_t *p_pri) {
int popBatchSize = p_popBatchSize ? numeric_cast<int>(*p_popBatchSize)
: TableConsumable::DEFAULT_POP_BATCH_SIZE;
int pri = p_pri ? numeric_cast<int>(*p_pri) : 0;
SWSSTry(return (SWSSConsumerStateTable) new ConsumerStateTable(
(DBConnector *)db, string(tableName), popBatchSize, pri));
}

void SWSSConsumerStateTable_free(SWSSConsumerStateTable tbl) {
SWSSTry(delete (ConsumerStateTable *)tbl);
}

SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl) {
SWSSTry({
deque<KeyOpFieldsValuesTuple> vkco;
((ConsumerStateTable *)tbl)->pops(vkco);
return makeKeyOpFieldValuesArray(vkco);
});
}

uint32_t SWSSConsumerStateTable_getFd(SWSSConsumerStateTable tbl) {
SWSSTry(return numeric_cast<uint32_t>(((ConsumerStateTable *)tbl)->getFd()));
}

SWSSSelectResult SWSSConsumerStateTable_readData(SWSSConsumerStateTable tbl, uint32_t timeout_ms,
uint8_t interrupt_on_signal) {
SWSSTry(return selectOne((ConsumerStateTable *)tbl, timeout_ms, interrupt_on_signal));
}
39 changes: 39 additions & 0 deletions common/c-api/consumerstatetable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#ifndef SWSS_COMMON_C_API_CONSUMERSTATETABLE_H
#define SWSS_COMMON_C_API_CONSUMERSTATETABLE_H

#include "dbconnector.h"
#include "util.h"

#ifdef __cplusplus
extern "C" {
#endif

#include <stdint.h>

typedef struct SWSSConsumerStateTableOpaque *SWSSConsumerStateTable;

// Pass NULL for popBatchSize and/or pri to use the default values
SWSSConsumerStateTable SWSSConsumerStateTable_new(SWSSDBConnector db, const char *tableName,
const int32_t *popBatchSize, const int32_t *pri);

void SWSSConsumerStateTable_free(SWSSConsumerStateTable tbl);

// Result array and all of its members must be freed using free()
SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl);

// Return the underlying fd for polling/selecting on.
// Callers must NOT read/write on fd, it may only be used for epoll or similar.
// After the fd becomes readable, SWSSConsumerStateTable_readData must be used to
// reset the fd and read data into internal data structures.
uint32_t SWSSConsumerStateTable_getFd(SWSSConsumerStateTable tbl);

// Block until data is available to read or until a timeout elapses.
// A timeout of 0 means the call will return immediately.
SWSSSelectResult SWSSConsumerStateTable_readData(SWSSConsumerStateTable tbl, uint32_t timeout_ms,
uint8_t interrupt_on_signal);

#ifdef __cplusplus
}
#endif

#endif
93 changes: 93 additions & 0 deletions common/c-api/dbconnector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#include <cstring>
#include <string>
#include <utility>

#include "../dbconnector.h"
#include "dbconnector.h"
#include "util.h"

using namespace swss;
using namespace std;

void SWSSSonicDBConfig_initialize(const char *path) {
SWSSTry(SonicDBConfig::initialize(path));
}

void SWSSSonicDBConfig_initializeGlobalConfig(const char *path) {
SWSSTry(SonicDBConfig::initializeGlobalConfig(path));
}

SWSSDBConnector SWSSDBConnector_new_tcp(int32_t dbId, const char *hostname, uint16_t port,
uint32_t timeout) {
SWSSTry(return (SWSSDBConnector) new DBConnector(dbId, string(hostname), port, timeout));
}

SWSSDBConnector SWSSDBConnector_new_unix(int32_t dbId, const char *sock_path, uint32_t timeout) {
SWSSTry(return (SWSSDBConnector) new DBConnector(dbId, string(sock_path), timeout));
}

SWSSDBConnector SWSSDBConnector_new_named(const char *dbName, uint32_t timeout_ms, uint8_t isTcpConn) {
SWSSTry(return (SWSSDBConnector) new DBConnector(string(dbName), timeout_ms, isTcpConn));
}

void SWSSDBConnector_free(SWSSDBConnector db) {
delete (DBConnector *)db;
}

int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key) {
SWSSTry(return ((DBConnector *)db)->del(string(key)) ? 1 : 0);
}

void SWSSDBConnector_set(SWSSDBConnector db, const char *key, SWSSStrRef value) {
SWSSTry(((DBConnector *)db)->set(string(key), takeStrRef(value)));
}

SWSSString SWSSDBConnector_get(SWSSDBConnector db, const char *key) {
SWSSTry({
shared_ptr<string> s = ((DBConnector *)db)->get(string(key));
return s ? makeString(move(*s)) : nullptr;
});
}

int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key) {
SWSSTry(return ((DBConnector *)db)->exists(string(key)) ? 1 : 0);
}

int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *field) {
SWSSTry(return ((DBConnector *)db)->hdel(string(key), string(field)) ? 1 : 0);
}

void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field,
SWSSStrRef value) {
SWSSTry(((DBConnector *)db)->hset(string(key), string(field), takeStrRef(value)));
}

SWSSString SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field) {
SWSSTry({
shared_ptr<string> s = ((DBConnector *)db)->hget(string(key), string(field));
return s ? makeString(move(*s)) : nullptr;
});
}

SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key) {
SWSSTry({
auto map = ((DBConnector *)db)->hgetall(string(key));

// We can't move keys out of the map, we have to copy them, until C++17 map::extract so we
// copy them here into a vector to avoid needing an overload on makeFieldValueArray
vector<pair<string, string>> pairs;
pairs.reserve(map.size());
for (auto &pair : map)
pairs.push_back(make_pair(pair.first, move(pair.second)));

return makeFieldValueArray(std::move(pairs));
});
}

int8_t SWSSDBConnector_hexists(SWSSDBConnector db, const char *key, const char *field) {
SWSSTry(return ((DBConnector *)db)->hexists(string(key), string(field)) ? 1 : 0);
}

int8_t SWSSDBConnector_flushdb(SWSSDBConnector db) {
SWSSTry(return ((DBConnector *)db)->flushdb() ? 1 : 0);
}
64 changes: 64 additions & 0 deletions common/c-api/dbconnector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#ifndef SWSS_COMMON_C_API_DBCONNECTOR_H
#define SWSS_COMMON_C_API_DBCONNECTOR_H

#include "util.h"
#ifdef __cplusplus
extern "C" {
#endif

#include <stdint.h>

void SWSSSonicDBConfig_initialize(const char *path);

void SWSSSonicDBConfig_initializeGlobalConfig(const char *path);

typedef struct SWSSDBConnectorOpaque *SWSSDBConnector;

// Pass 0 to timeout for infinity
SWSSDBConnector SWSSDBConnector_new_tcp(int32_t dbId, const char *hostname, uint16_t port,
uint32_t timeout_ms);

// Pass 0 to timeout for infinity
SWSSDBConnector SWSSDBConnector_new_unix(int32_t dbId, const char *sock_path, uint32_t timeout_ms);

// Pass 0 to timeout for infinity
SWSSDBConnector SWSSDBConnector_new_named(const char *dbName, uint32_t timeout_ms, uint8_t isTcpConn);

void SWSSDBConnector_free(SWSSDBConnector db);

// Returns 0 when key doesn't exist, 1 when key was deleted
int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key);

void SWSSDBConnector_set(SWSSDBConnector db, const char *key, SWSSStrRef value);

// Returns NULL if key doesn't exist
// Result must be freed using SWSSString_free()
SWSSString SWSSDBConnector_get(SWSSDBConnector db, const char *key);

// Returns 0 for false, 1 for true
int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key);

// Returns 0 when key or field doesn't exist, 1 when field was deleted
int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *field);

void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field, SWSSStrRef value);

// Returns NULL if key or field doesn't exist
// Result must be freed using SWSSString_free()
SWSSString SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field);

// Returns an empty map when the key doesn't exist
// Result array and all of its elements must be freed using appropriate free functions
SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key);

// Returns 0 when key or field doesn't exist, 1 when field exists
int8_t SWSSDBConnector_hexists(SWSSDBConnector db, const char *key, const char *field);

// Returns 1 on success, 0 on failure
int8_t SWSSDBConnector_flushdb(SWSSDBConnector db);

#ifdef __cplusplus
}
#endif

#endif
Loading

0 comments on commit c19af49

Please sign in to comment.