diff --git a/common/Makefile.am b/common/Makefile.am index 18cfd803..724805e6 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -68,7 +68,18 @@ common_libswsscommon_la_SOURCES = \ common/zmqclient.cpp \ common/zmqserver.cpp \ common/asyncdbupdater.cpp \ - common/redis_table_waiter.cpp + common/redis_table_waiter.cpp \ + common/interface.h \ + common/c-api/util.cpp \ + common/c-api/dbconnector.cpp \ + common/c-api/consumerstatetable.cpp \ + common/c-api/producerstatetable.cpp \ + common/c-api/subscriberstatetable.cpp \ + common/c-api/zmqclient.cpp \ + common/c-api/zmqserver.cpp \ + common/c-api/zmqconsumerstatetable.cpp \ + common/c-api/zmqproducerstatetable.cpp \ + common/performancetimer.cpp common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS) common_libswsscommon_la_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CPPFLAGS) $(CODE_COVERAGE_CPPFLAGS) diff --git a/common/asyncdbupdater.cpp b/common/asyncdbupdater.cpp index 4cf150d9..cf3d74c5 100644 --- a/common/asyncdbupdater.cpp +++ b/common/asyncdbupdater.cpp @@ -30,6 +30,7 @@ AsyncDBUpdater::~AsyncDBUpdater() // notify db update thread exit m_dbUpdateDataNotifyCv.notify_all(); m_dbUpdateThread->join(); + SWSS_LOG_DEBUG("AsyncDBUpdater dtor tableName: %s", m_tableName.c_str()); } void AsyncDBUpdater::update(std::shared_ptr pkco) @@ -61,16 +62,30 @@ void AsyncDBUpdater::dbUpdateThread() std::mutex cvMutex; std::unique_lock cvLock(cvMutex); - while (m_runThread) + while (true) { size_t count; count = queueSize(); if (count == 0) { + // Check if there still data in queue before exit + if (!m_runThread) + { + SWSS_LOG_NOTICE("dbUpdateThread for table: %s is exiting", m_tableName.c_str()); + break; + } + // when queue is empty, wait notification, when data come, continue to check queue size again m_dbUpdateDataNotifyCv.wait(cvLock); continue; } + else + { + if (!m_runThread) + { + SWSS_LOG_DEBUG("dbUpdateThread for table: %s still has %d records that need to be sent before exiting", m_tableName.c_str(), (int)count); + } + } for (size_t ie = 0; ie < count; ie++) { diff --git a/common/binaryserializer.h b/common/binaryserializer.h index 413ca501..6ae4dcd2 100644 --- a/common/binaryserializer.h +++ b/common/binaryserializer.h @@ -2,6 +2,8 @@ #define __BINARY_SERIALIZER__ #include "common/armhelper.h" +#include "common/rediscommand.h" +#include "common/table.h" #include @@ -11,6 +13,26 @@ namespace swss { class BinarySerializer { public: + static size_t serializedSize(const string &dbName, const string &tableName, + const vector &kcos) { + size_t n = 0; + n += dbName.size() + sizeof(size_t); + n += tableName.size() + sizeof(size_t); + + for (const KeyOpFieldsValuesTuple &kco : kcos) { + const vector &fvs = kfvFieldsValues(kco); + n += kfvKey(kco).size() + sizeof(size_t); + n += to_string(fvs.size()).size() + sizeof(size_t); + + for (const FieldValueTuple &fv : fvs) { + n += fvField(fv).size() + sizeof(size_t); + n += fvValue(fv).size() + sizeof(size_t); + } + } + + return n + sizeof(size_t); + } + static size_t serializeBuffer( const char* buffer, const size_t size, @@ -192,8 +214,8 @@ class BinarySerializer { { if ((size_t)(m_current_position - m_buffer + datalen + sizeof(size_t)) > m_buffer_size) { - SWSS_LOG_THROW("There are not enough buffer for binary serializer to serialize,\ - key count: %zu, data length %zu, buffer size: %zu", + SWSS_LOG_THROW("There are not enough buffer for binary serializer to serialize,\n" + " key count: %zu, data length %zu, buffer size: %zu", m_kvp_count, datalen, m_buffer_size); diff --git a/common/c-api/consumerstatetable.cpp b/common/c-api/consumerstatetable.cpp new file mode 100644 index 00000000..9765ceec --- /dev/null +++ b/common/c-api/consumerstatetable.cpp @@ -0,0 +1,45 @@ +#include +#include +#include +#include + +#include "../consumerstatetable.h" +#include "../dbconnector.h" +#include "../table.h" +#include "consumerstatetable.h" +#include "util.h" + +using namespace swss; +using namespace std; +using boost::numeric_cast; + +SWSSConsumerStateTable SWSSConsumerStateTable_new(SWSSDBConnector db, const char *tableName, + const int32_t *p_popBatchSize, + const int32_t *p_pri) { + int popBatchSize = p_popBatchSize ? numeric_cast(*p_popBatchSize) + : TableConsumable::DEFAULT_POP_BATCH_SIZE; + int pri = p_pri ? numeric_cast(*p_pri) : 0; + SWSSTry(return (SWSSConsumerStateTable) new ConsumerStateTable( + (DBConnector *)db, string(tableName), popBatchSize, pri)); +} + +void SWSSConsumerStateTable_free(SWSSConsumerStateTable tbl) { + SWSSTry(delete (ConsumerStateTable *)tbl); +} + +SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl) { + SWSSTry({ + deque vkco; + ((ConsumerStateTable *)tbl)->pops(vkco); + return makeKeyOpFieldValuesArray(vkco); + }); +} + +uint32_t SWSSConsumerStateTable_getFd(SWSSConsumerStateTable tbl) { + SWSSTry(return numeric_cast(((ConsumerStateTable *)tbl)->getFd())); +} + +SWSSSelectResult SWSSConsumerStateTable_readData(SWSSConsumerStateTable tbl, uint32_t timeout_ms, + uint8_t interrupt_on_signal) { + SWSSTry(return selectOne((ConsumerStateTable *)tbl, timeout_ms, interrupt_on_signal)); +} diff --git a/common/c-api/consumerstatetable.h b/common/c-api/consumerstatetable.h new file mode 100644 index 00000000..468fb644 --- /dev/null +++ b/common/c-api/consumerstatetable.h @@ -0,0 +1,39 @@ +#ifndef SWSS_COMMON_C_API_CONSUMERSTATETABLE_H +#define SWSS_COMMON_C_API_CONSUMERSTATETABLE_H + +#include "dbconnector.h" +#include "util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef struct SWSSConsumerStateTableOpaque *SWSSConsumerStateTable; + +// Pass NULL for popBatchSize and/or pri to use the default values +SWSSConsumerStateTable SWSSConsumerStateTable_new(SWSSDBConnector db, const char *tableName, + const int32_t *popBatchSize, const int32_t *pri); + +void SWSSConsumerStateTable_free(SWSSConsumerStateTable tbl); + +// Result array and all of its members must be freed using free() +SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl); + +// Return the underlying fd for polling/selecting on. +// Callers must NOT read/write on fd, it may only be used for epoll or similar. +// After the fd becomes readable, SWSSConsumerStateTable_readData must be used to +// reset the fd and read data into internal data structures. +uint32_t SWSSConsumerStateTable_getFd(SWSSConsumerStateTable tbl); + +// Block until data is available to read or until a timeout elapses. +// A timeout of 0 means the call will return immediately. +SWSSSelectResult SWSSConsumerStateTable_readData(SWSSConsumerStateTable tbl, uint32_t timeout_ms, + uint8_t interrupt_on_signal); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/dbconnector.cpp b/common/c-api/dbconnector.cpp new file mode 100644 index 00000000..83f237cc --- /dev/null +++ b/common/c-api/dbconnector.cpp @@ -0,0 +1,93 @@ +#include +#include +#include + +#include "../dbconnector.h" +#include "dbconnector.h" +#include "util.h" + +using namespace swss; +using namespace std; + +void SWSSSonicDBConfig_initialize(const char *path) { + SWSSTry(SonicDBConfig::initialize(path)); +} + +void SWSSSonicDBConfig_initializeGlobalConfig(const char *path) { + SWSSTry(SonicDBConfig::initializeGlobalConfig(path)); +} + +SWSSDBConnector SWSSDBConnector_new_tcp(int32_t dbId, const char *hostname, uint16_t port, + uint32_t timeout) { + SWSSTry(return (SWSSDBConnector) new DBConnector(dbId, string(hostname), port, timeout)); +} + +SWSSDBConnector SWSSDBConnector_new_unix(int32_t dbId, const char *sock_path, uint32_t timeout) { + SWSSTry(return (SWSSDBConnector) new DBConnector(dbId, string(sock_path), timeout)); +} + +SWSSDBConnector SWSSDBConnector_new_named(const char *dbName, uint32_t timeout_ms, uint8_t isTcpConn) { + SWSSTry(return (SWSSDBConnector) new DBConnector(string(dbName), timeout_ms, isTcpConn)); +} + +void SWSSDBConnector_free(SWSSDBConnector db) { + delete (DBConnector *)db; +} + +int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key) { + SWSSTry(return ((DBConnector *)db)->del(string(key)) ? 1 : 0); +} + +void SWSSDBConnector_set(SWSSDBConnector db, const char *key, SWSSStrRef value) { + SWSSTry(((DBConnector *)db)->set(string(key), takeStrRef(value))); +} + +SWSSString SWSSDBConnector_get(SWSSDBConnector db, const char *key) { + SWSSTry({ + shared_ptr s = ((DBConnector *)db)->get(string(key)); + return s ? makeString(move(*s)) : nullptr; + }); +} + +int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key) { + SWSSTry(return ((DBConnector *)db)->exists(string(key)) ? 1 : 0); +} + +int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *field) { + SWSSTry(return ((DBConnector *)db)->hdel(string(key), string(field)) ? 1 : 0); +} + +void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field, + SWSSStrRef value) { + SWSSTry(((DBConnector *)db)->hset(string(key), string(field), takeStrRef(value))); +} + +SWSSString SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field) { + SWSSTry({ + shared_ptr s = ((DBConnector *)db)->hget(string(key), string(field)); + return s ? makeString(move(*s)) : nullptr; + }); +} + +SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key) { + SWSSTry({ + auto map = ((DBConnector *)db)->hgetall(string(key)); + + // We can't move keys out of the map, we have to copy them, until C++17 map::extract so we + // copy them here into a vector to avoid needing an overload on makeFieldValueArray + vector> pairs; + pairs.reserve(map.size()); + for (auto &pair : map) + pairs.push_back(make_pair(pair.first, move(pair.second))); + + return makeFieldValueArray(std::move(pairs)); + }); +} + +int8_t SWSSDBConnector_hexists(SWSSDBConnector db, const char *key, const char *field) { + SWSSTry(return ((DBConnector *)db)->hexists(string(key), string(field)) ? 1 : 0); +} + +int8_t SWSSDBConnector_flushdb(SWSSDBConnector db) { + SWSSTry(return ((DBConnector *)db)->flushdb() ? 1 : 0); +} diff --git a/common/c-api/dbconnector.h b/common/c-api/dbconnector.h new file mode 100644 index 00000000..fe4acdf4 --- /dev/null +++ b/common/c-api/dbconnector.h @@ -0,0 +1,64 @@ +#ifndef SWSS_COMMON_C_API_DBCONNECTOR_H +#define SWSS_COMMON_C_API_DBCONNECTOR_H + +#include "util.h" +#ifdef __cplusplus +extern "C" { +#endif + +#include + +void SWSSSonicDBConfig_initialize(const char *path); + +void SWSSSonicDBConfig_initializeGlobalConfig(const char *path); + +typedef struct SWSSDBConnectorOpaque *SWSSDBConnector; + +// Pass 0 to timeout for infinity +SWSSDBConnector SWSSDBConnector_new_tcp(int32_t dbId, const char *hostname, uint16_t port, + uint32_t timeout_ms); + +// Pass 0 to timeout for infinity +SWSSDBConnector SWSSDBConnector_new_unix(int32_t dbId, const char *sock_path, uint32_t timeout_ms); + +// Pass 0 to timeout for infinity +SWSSDBConnector SWSSDBConnector_new_named(const char *dbName, uint32_t timeout_ms, uint8_t isTcpConn); + +void SWSSDBConnector_free(SWSSDBConnector db); + +// Returns 0 when key doesn't exist, 1 when key was deleted +int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key); + +void SWSSDBConnector_set(SWSSDBConnector db, const char *key, SWSSStrRef value); + +// Returns NULL if key doesn't exist +// Result must be freed using SWSSString_free() +SWSSString SWSSDBConnector_get(SWSSDBConnector db, const char *key); + +// Returns 0 for false, 1 for true +int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key); + +// Returns 0 when key or field doesn't exist, 1 when field was deleted +int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *field); + +void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field, SWSSStrRef value); + +// Returns NULL if key or field doesn't exist +// Result must be freed using SWSSString_free() +SWSSString SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field); + +// Returns an empty map when the key doesn't exist +// Result array and all of its elements must be freed using appropriate free functions +SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key); + +// Returns 0 when key or field doesn't exist, 1 when field exists +int8_t SWSSDBConnector_hexists(SWSSDBConnector db, const char *key, const char *field); + +// Returns 1 on success, 0 on failure +int8_t SWSSDBConnector_flushdb(SWSSDBConnector db); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/producerstatetable.cpp b/common/c-api/producerstatetable.cpp new file mode 100644 index 00000000..276d7c68 --- /dev/null +++ b/common/c-api/producerstatetable.cpp @@ -0,0 +1,53 @@ +#include +#include + +#include "../dbconnector.h" +#include "../producerstatetable.h" +#include "dbconnector.h" +#include "producerstatetable.h" +#include "util.h" + +using namespace swss; +using namespace std; + +SWSSProducerStateTable SWSSProducerStateTable_new(SWSSDBConnector db, const char *tableName) { + SWSSTry(return (SWSSProducerStateTable) new ProducerStateTable((DBConnector *)db, + string(tableName))); +} + +void SWSSProducerStateTable_free(SWSSProducerStateTable tbl) { + SWSSTry(delete ((ProducerStateTable *)tbl)); +} + +void SWSSProducerStateTable_setBuffered(SWSSProducerStateTable tbl, uint8_t buffered) { + SWSSTry(((ProducerStateTable *)tbl)->setBuffered((bool)buffered)) +} + +void SWSSProducerStateTable_set(SWSSProducerStateTable tbl, const char *key, + SWSSFieldValueArray values) { + SWSSTry(((ProducerStateTable *)tbl)->set(string(key), takeFieldValueArray(std::move(values)))); +} + +void SWSSProducerStateTable_del(SWSSProducerStateTable tbl, const char *key) { + SWSSTry(((ProducerStateTable *)tbl)->del(string(key))); +} + +void SWSSProducerStateTable_flush(SWSSProducerStateTable tbl) { + SWSSTry(((ProducerStateTable *)tbl)->flush()); +} + +int64_t SWSSProducerStateTable_count(SWSSProducerStateTable tbl) { + SWSSTry(return ((ProducerStateTable *)tbl)->count()); +} + +void SWSSProducerStateTable_clear(SWSSProducerStateTable tbl) { + SWSSTry(((ProducerStateTable *)tbl)->clear()); +} + +void SWSSProducerStateTable_create_temp_view(SWSSProducerStateTable tbl) { + SWSSTry(((ProducerStateTable *)tbl)->create_temp_view()); +} + +void SWSSProducerStateTable_apply_temp_view(SWSSProducerStateTable tbl) { + SWSSTry(((ProducerStateTable *)tbl)->apply_temp_view()); +} diff --git a/common/c-api/producerstatetable.h b/common/c-api/producerstatetable.h new file mode 100644 index 00000000..1acb9af3 --- /dev/null +++ b/common/c-api/producerstatetable.h @@ -0,0 +1,39 @@ +#ifndef SWSS_COMMON_C_API_PRODUCERSTATETABLE_H +#define SWSS_COMMON_C_API_PRODUCERSTATETABLE_H + +#include "dbconnector.h" +#include "util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef struct SWSSProducerStateTableOpaque *SWSSProducerStateTable; + +SWSSProducerStateTable SWSSProducerStateTable_new(SWSSDBConnector db, const char *tableName); + +void SWSSProducerStateTable_free(SWSSProducerStateTable tbl); + +void SWSSProducerStateTable_setBuffered(SWSSProducerStateTable tbl, uint8_t buffered); + +void SWSSProducerStateTable_set(SWSSProducerStateTable tbl, const char *key, SWSSFieldValueArray values); + +void SWSSProducerStateTable_del(SWSSProducerStateTable tbl, const char *key); + +void SWSSProducerStateTable_flush(SWSSProducerStateTable tbl); + +int64_t SWSSProducerStateTable_count(SWSSProducerStateTable tbl); + +void SWSSProducerStateTable_clear(SWSSProducerStateTable tbl); + +void SWSSProducerStateTable_create_temp_view(SWSSProducerStateTable tbl); + +void SWSSProducerStateTable_apply_temp_view(SWSSProducerStateTable tbl); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/subscriberstatetable.cpp b/common/c-api/subscriberstatetable.cpp new file mode 100644 index 00000000..4d3a0495 --- /dev/null +++ b/common/c-api/subscriberstatetable.cpp @@ -0,0 +1,47 @@ +#include +#include +#include +#include +#include + +#include "../dbconnector.h" +#include "../subscriberstatetable.h" +#include "../table.h" +#include "subscriberstatetable.h" +#include "util.h" + +using namespace swss; +using namespace std; +using boost::numeric_cast; + +SWSSSubscriberStateTable SWSSSubscriberStateTable_new(SWSSDBConnector db, const char *tableName, + const int32_t *p_popBatchSize, + const int32_t *p_pri) { + int popBatchSize = p_popBatchSize ? numeric_cast(*p_popBatchSize) + : TableConsumable::DEFAULT_POP_BATCH_SIZE; + int pri = p_pri ? numeric_cast(*p_pri) : 0; + SWSSTry(return (SWSSSubscriberStateTable) new SubscriberStateTable( + (DBConnector *)db, string(tableName), popBatchSize, pri)); +} + +void SWSSSubscriberStateTable_free(SWSSSubscriberStateTable tbl) { + delete (SubscriberStateTable *)tbl; +} + +SWSSKeyOpFieldValuesArray SWSSSubscriberStateTable_pops(SWSSSubscriberStateTable tbl) { + SWSSTry({ + deque vkco; + ((SubscriberStateTable *)tbl)->pops(vkco); + return makeKeyOpFieldValuesArray(vkco); + }); +} + +uint32_t SWSSSubscriberStateTable_getFd(SWSSSubscriberStateTable tbl) { + SWSSTry(return numeric_cast(((SubscriberStateTable *)tbl)->getFd())); +} + +SWSSSelectResult SWSSSubscriberStateTable_readData(SWSSSubscriberStateTable tbl, + uint32_t timeout_ms, + uint8_t interrupt_on_signal) { + SWSSTry(return selectOne((SubscriberStateTable *)tbl, timeout_ms, interrupt_on_signal)); +} diff --git a/common/c-api/subscriberstatetable.h b/common/c-api/subscriberstatetable.h new file mode 100644 index 00000000..ed0924c8 --- /dev/null +++ b/common/c-api/subscriberstatetable.h @@ -0,0 +1,41 @@ +#ifndef SWSS_COMMON_C_API_SUBSCRIBERSTATETABLE_H +#define SWSS_COMMON_C_API_SUBSCRIBERSTATETABLE_H + +#include "dbconnector.h" +#include "util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef struct SWSSSubscriberStateTableOpaque *SWSSSubscriberStateTable; + +// Pass NULL for popBatchSize and/or pri to use the default values +SWSSSubscriberStateTable SWSSSubscriberStateTable_new(SWSSDBConnector db, const char *tableName, + const int32_t *popBatchSize, + const int32_t *pri); + +void SWSSSubscriberStateTable_free(SWSSSubscriberStateTable tbl); + +// Result array and all of its members must be freed using free() +SWSSKeyOpFieldValuesArray SWSSSubscriberStateTable_pops(SWSSSubscriberStateTable tbl); + +// Return the underlying fd for polling/selecting on. +// Callers must NOT read/write on fd, it may only be used for epoll or similar. +// After the fd becomes readable, SWSSSubscriberStateTable_readData must be used to +// reset the fd and read data into internal data structures. +uint32_t SWSSSubscriberStateTable_getFd(SWSSSubscriberStateTable tbl); + +// Block until data is available to read or until a timeout elapses. +// A timeout of 0 means the call will return immediately. +SWSSSelectResult SWSSSubscriberStateTable_readData(SWSSSubscriberStateTable tbl, + uint32_t timeout_ms, + uint8_t interrupt_on_sugnal); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/util.cpp b/common/c-api/util.cpp new file mode 100644 index 00000000..1dc6cd45 --- /dev/null +++ b/common/c-api/util.cpp @@ -0,0 +1,33 @@ +#include "util.h" + +using namespace swss; + +bool swss::cApiTestingDisableAbort = false; + +SWSSString SWSSString_new(const char *data, uint64_t length) { + SWSSTry(return makeString(std::string(data, numeric_cast(length)))); +} + +SWSSString SWSSString_new_c_str(const char *c_str) { + SWSSTry(return makeString(std::string(c_str))); +} + +const char *SWSSStrRef_c_str(SWSSStrRef s) { + SWSSTry(return ((std::string *)s)->c_str()); +} + +uint64_t SWSSStrRef_length(SWSSStrRef s) { + SWSSTry(return ((std::string *)s)->length()); +} + +void SWSSString_free(SWSSString s) { + SWSSTry(delete (std::string *)s); +} + +void SWSSFieldValueArray_free(SWSSFieldValueArray arr) { + SWSSTry(delete[] arr.data); +} + +void SWSSKeyOpFieldValuesArray_free(SWSSKeyOpFieldValuesArray kfvs) { + SWSSTry(delete[] kfvs.data); +} diff --git a/common/c-api/util.h b/common/c-api/util.h new file mode 100644 index 00000000..06aeac15 --- /dev/null +++ b/common/c-api/util.h @@ -0,0 +1,267 @@ +#ifndef SWSS_COMMON_C_API_UTIL_H +#define SWSS_COMMON_C_API_UTIL_H + +// External utilities (c-facing) +#ifdef __cplusplus +extern "C" { +#endif + +#include + +// FFI version of std::string&& +// This can be converted to an SWSSStrRef with a standard cast +typedef struct SWSSStringOpaque *SWSSString; + +// FFI version of std::string& +// This can be converted to an SWSSString with a standard cast +// Functions that take SWSSString will move data out of the underlying string, +// but functions that take SWSSStrRef will only view it. +typedef struct SWSSStrRefOpaque *SWSSStrRef; + +// FFI version of swss::FieldValueTuple +typedef struct { + const char *field; + SWSSString value; +} SWSSFieldValueTuple; + +// FFI version of std::vector +typedef struct { + uint64_t len; + SWSSFieldValueTuple *data; +} SWSSFieldValueArray; + +typedef enum { + SWSSKeyOperation_SET, + SWSSKeyOperation_DEL, +} SWSSKeyOperation; + +// FFI version of swss::KeyOpFieldValuesTuple +typedef struct { + const char *key; + SWSSKeyOperation operation; + SWSSFieldValueArray fieldValues; +} SWSSKeyOpFieldValues; + +// FFI version of std::vector +typedef struct { + uint64_t len; + SWSSKeyOpFieldValues *data; +} SWSSKeyOpFieldValuesArray; + +// FFI version of swss::Select::{OBJECT, TIMEOUT, SIGNALINT}. +// swss::Select::ERROR is left out because errors are handled separately +typedef enum { + // Data is available in the object + SWSSSelectResult_DATA = 0, + // Timed out waiting for data + SWSSSelectResult_TIMEOUT = 1, + // Waiting was interrupted by a signal + SWSSSelectResult_SIGNAL = 2, +} SWSSSelectResult; + +// data should not include a null terminator +SWSSString SWSSString_new(const char *data, uint64_t length); + +// c_str should include a null terminator +SWSSString SWSSString_new_c_str(const char *c_str); + +// It is safe to pass null to this function (not to any other SWSSString functions). This is +// useful to take SWSSStrings from other SWSS structs - you can replace the strs in the +// structs with null and still safely free the structs. Then, you can call this function with the +// populated SWSSString later. +void SWSSString_free(SWSSString s); + +const char *SWSSStrRef_c_str(SWSSStrRef s); + +// Returns the length of the string, not including the null terminator that is implicitly added by +// SWSSStrRef_c_str. +uint64_t SWSSStrRef_length(SWSSStrRef s); + +// arr.data may be null. This is not recursive - elements must be freed separately (for finer +// grained control of ownership). +void SWSSFieldValueArray_free(SWSSFieldValueArray arr); + +// kfvs.data may be null. This is not recursive - elements must be freed separately (for finer +// grained control of ownership). +void SWSSKeyOpFieldValuesArray_free(SWSSKeyOpFieldValuesArray kfvs); + +#ifdef __cplusplus +} +#endif + +// Internal utilities (used to help define c-facing functions) +#ifdef __cplusplus + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../logger.h" +#include "../redisapi.h" +#include "../schema.h" +#include "../select.h" + +using boost::numeric_cast; + +namespace swss { + +extern bool cApiTestingDisableAbort; + +// In the catch block, we must abort because passing an exception across an ffi boundary is +// undefined behavior. It was also decided that no exceptions in swss-common are recoverable, so +// there is no reason to convert exceptions into a returnable type. +#define SWSSTry(...) \ + if (swss::cApiTestingDisableAbort) { \ + { __VA_ARGS__; } \ + } else { \ + try { \ + { __VA_ARGS__; } \ + } catch (std::exception & e) { \ + std::cerr << "Aborting due to exception: " << e.what() << std::endl; \ + SWSS_LOG_ERROR("Aborting due to exception: %s", e.what()); \ + std::abort(); \ + } \ + } + +static inline SWSSSelectResult selectOne(swss::Selectable *s, uint32_t timeout_ms, + uint8_t interrupt_on_signal) { + Select select; + Selectable *sOut; + select.addSelectable(s); + int ret = select.select(&sOut, numeric_cast(timeout_ms), interrupt_on_signal); + switch (ret) { + case Select::OBJECT: + return SWSSSelectResult_DATA; + case Select::ERROR: + throw std::system_error(errno, std::generic_category()); + case Select::TIMEOUT: + return SWSSSelectResult_TIMEOUT; + case Select::SIGNALINT: + return SWSSSelectResult_SIGNAL; + default: + SWSS_LOG_THROW("impossible: unhandled Select::select() return value: %d", ret); + } +} + +static inline SWSSString makeString(std::string &&s) { + std::string *data_s = new std::string(std::move(s)); + return (struct SWSSStringOpaque *)data_s; +} + +// T is anything that has a .size() method and which can be iterated over for pair +// eg vector> +template static inline SWSSFieldValueArray makeFieldValueArray(T &&in) { + SWSSFieldValueTuple *data = new SWSSFieldValueTuple[in.size()]; + + size_t i = 0; + for (auto &pair : in) { + SWSSFieldValueTuple entry; + entry.field = strdup(pair.first.c_str()); + entry.value = makeString(std::move(pair.second)); + data[i++] = entry; + } + + SWSSFieldValueArray out; + out.len = (uint64_t)in.size(); + out.data = data; + return out; +} + +static inline SWSSKeyOperation makeKeyOperation(std::string &op) { + if (strcmp(op.c_str(), SET_COMMAND) == 0) { + return SWSSKeyOperation_SET; + } else if (strcmp(op.c_str(), DEL_COMMAND) == 0) { + return SWSSKeyOperation_DEL; + } else { + SWSS_LOG_THROW("Invalid key operation %s", op.c_str()); + } +} + +static inline SWSSKeyOpFieldValues makeKeyOpFieldValues(swss::KeyOpFieldsValuesTuple &&in) { + SWSSKeyOpFieldValues out; + out.key = strdup(kfvKey(in).c_str()); + out.operation = makeKeyOperation(kfvOp(in)); + out.fieldValues = makeFieldValueArray(kfvFieldsValues(in)); + return out; +} + +template static inline T &getReference(T &t) { + return t; +} + +template static inline T &getReference(std::shared_ptr &t) { + return *t; +} + +// T is anything that has a .size() method and which can be iterated over for +// swss::KeyOpFieldValuesTuple, eg vector or deque +template static inline SWSSKeyOpFieldValuesArray makeKeyOpFieldValuesArray(T &&in) { + SWSSKeyOpFieldValues *data = new SWSSKeyOpFieldValues[in.size()]; + + size_t i = 0; + for (auto &kfv : in) + data[i++] = makeKeyOpFieldValues(std::move(getReference(kfv))); + + SWSSKeyOpFieldValuesArray out; + out.len = (uint64_t)in.size(); + out.data = data; + return out; +} + +static inline std::string takeString(SWSSString s) { + return std::string(std::move(*((std::string *)s))); +} + +static inline std::string &takeStrRef(SWSSStrRef s) { + return *((std::string *)s); +} + +static inline std::vector takeFieldValueArray(SWSSFieldValueArray in) { + std::vector out; + for (uint64_t i = 0; i < in.len; i++) { + const char *field = in.data[i].field; + SWSSString value = in.data[i].value; + auto pair = std::make_pair(std::string(field), takeString(std::move(value))); + out.push_back(pair); + } + return out; +} + +static inline std::string takeKeyOperation(SWSSKeyOperation op) { + switch (op) { + case SWSSKeyOperation_SET: + return SET_COMMAND; + case SWSSKeyOperation_DEL: + return DEL_COMMAND; + default: + SWSS_LOG_THROW("Impossible SWSSKeyOperation"); + } +} + +static inline swss::KeyOpFieldsValuesTuple takeKeyOpFieldValues(SWSSKeyOpFieldValues in) { + std::string key = in.key; + std::string op = takeKeyOperation(in.operation); + auto fieldValues = takeFieldValueArray(in.fieldValues); + return std::make_tuple(key, op, fieldValues); +} + +static inline std::vector +takeKeyOpFieldValuesArray(SWSSKeyOpFieldValuesArray in) { + std::vector out; + for (uint64_t i = 0; i < in.len; i++) { + SWSSKeyOpFieldValues kfv = in.data[i]; + out.push_back(takeKeyOpFieldValues(std::move(kfv))); + } + return out; +} + +} // namespace swss + +#endif +#endif diff --git a/common/c-api/zmqclient.cpp b/common/c-api/zmqclient.cpp new file mode 100644 index 00000000..fa1d59ca --- /dev/null +++ b/common/c-api/zmqclient.cpp @@ -0,0 +1,33 @@ +#include "../zmqclient.h" +#include "../binaryserializer.h" +#include "util.h" +#include "zmqclient.h" + +using namespace swss; +using namespace std; + +SWSSZmqClient SWSSZmqClient_new(const char *endpoint) { + SWSSTry(return (SWSSZmqClient) new ZmqClient(endpoint)); +} + +void SWSSZmqClient_free(SWSSZmqClient zmqc) { + SWSSTry(delete (ZmqClient *)zmqc); +} + +// Returns 0 for false, 1 for true +int8_t SWSSZmqClient_isConnected(SWSSZmqClient zmqc) { + SWSSTry(return ((ZmqClient *)zmqc)->isConnected() ? 1 : 0); +} + +void SWSSZmqClient_connect(SWSSZmqClient zmqc) { + SWSSTry(((ZmqClient *)zmqc)->connect()); +} + +void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *tableName, + SWSSKeyOpFieldValuesArray arr) { + SWSSTry({ + vector kcos = takeKeyOpFieldValuesArray(arr); + ((ZmqClient *)zmqc) + ->sendMsg(string(dbName), string(tableName), kcos); + }); +} diff --git a/common/c-api/zmqclient.h b/common/c-api/zmqclient.h new file mode 100644 index 00000000..da832ab3 --- /dev/null +++ b/common/c-api/zmqclient.h @@ -0,0 +1,30 @@ +#ifndef SWSS_COMMON_C_API_ZMQCLIENT_H +#define SWSS_COMMON_C_API_ZMQCLIENT_H + +#include "util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef struct SWSSZmqClientOpaque *SWSSZmqClient; + +SWSSZmqClient SWSSZmqClient_new(const char *endpoint); + +void SWSSZmqClient_free(SWSSZmqClient zmqc); + +// Returns 0 for false, 1 for true +int8_t SWSSZmqClient_isConnected(SWSSZmqClient zmqc); + +void SWSSZmqClient_connect(SWSSZmqClient zmqc); + +void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *tableName, + SWSSKeyOpFieldValuesArray kcos); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/zmqconsumerstatetable.cpp b/common/c-api/zmqconsumerstatetable.cpp new file mode 100644 index 00000000..ed416488 --- /dev/null +++ b/common/c-api/zmqconsumerstatetable.cpp @@ -0,0 +1,51 @@ +#include +#include "../zmqconsumerstatetable.h" +#include "../table.h" +#include "util.h" +#include "zmqconsumerstatetable.h" +#include "zmqserver.h" +#include + +using namespace swss; +using namespace std; +using boost::numeric_cast; + +// Pass NULL for popBatchSize and/or pri to use the default values +SWSSZmqConsumerStateTable SWSSZmqConsumerStateTable_new(SWSSDBConnector db, const char *tableName, + SWSSZmqServer zmqs, + const int32_t *p_popBatchSize, + const int32_t *p_pri) { + + int popBatchSize = p_popBatchSize ? numeric_cast(*p_popBatchSize) + : TableConsumable::DEFAULT_POP_BATCH_SIZE; + int pri = p_pri ? numeric_cast(*p_pri) : 0; + SWSSTry(return (SWSSZmqConsumerStateTable) new ZmqConsumerStateTable( + (DBConnector *)db, string(tableName), *(ZmqServer *)zmqs, popBatchSize, pri)); +} + +void SWSSZmqConsumerStateTable_free(SWSSZmqConsumerStateTable tbl) { + SWSSTry(delete (ZmqConsumerStateTable *)tbl); +} + +SWSSKeyOpFieldValuesArray SWSSZmqConsumerStateTable_pops(SWSSZmqConsumerStateTable tbl) { + SWSSTry({ + deque vkco; + ((ZmqConsumerStateTable *)tbl)->pops(vkco); + return makeKeyOpFieldValuesArray(vkco); + }); +} + +uint32_t SWSSZmqConsumerStateTable_getFd(SWSSZmqConsumerStateTable tbl) { + SWSSTry(return numeric_cast(((ZmqConsumerStateTable *)tbl)->getFd())); +} + +SWSSSelectResult SWSSZmqConsumerStateTable_readData(SWSSZmqConsumerStateTable tbl, + uint32_t timeout_ms, + uint8_t interrupt_on_signal) { + SWSSTry(return selectOne((ZmqConsumerStateTable *)tbl, timeout_ms, interrupt_on_signal)); +} + +const struct SWSSDBConnectorOpaque * +SWSSZmqConsumerStateTable_getDbConnector(SWSSZmqConsumerStateTable tbl) { + SWSSTry(return (const SWSSDBConnectorOpaque *)((ZmqConsumerStateTable *)tbl)->getDbConnector()); +} diff --git a/common/c-api/zmqconsumerstatetable.h b/common/c-api/zmqconsumerstatetable.h new file mode 100644 index 00000000..f5b93425 --- /dev/null +++ b/common/c-api/zmqconsumerstatetable.h @@ -0,0 +1,46 @@ +#ifndef SWSS_COMMON_C_API_ZMQCONSUMERSTATETABLE_H +#define SWSS_COMMON_C_API_ZMQCONSUMERSTATETABLE_H + +#include "dbconnector.h" +#include "util.h" +#include "zmqserver.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef struct SWSSZmqConsumerStateTableOpaque *SWSSZmqConsumerStateTable; + +// Pass NULL for popBatchSize and/or pri to use the default values +SWSSZmqConsumerStateTable SWSSZmqConsumerStateTable_new(SWSSDBConnector db, const char *tableName, + SWSSZmqServer zmqs, + const int32_t *popBatchSize, + const int32_t *pri); + +void SWSSZmqConsumerStateTable_free(SWSSZmqConsumerStateTable tbl); + +// Result array and all of its members must be freed using free() +SWSSKeyOpFieldValuesArray SWSSZmqConsumerStateTable_pops(SWSSZmqConsumerStateTable tbl); + +// Return the underlying fd for polling/selecting on. +// Callers must NOT read/write on fd, it may only be used for epoll or similar. +// After the fd becomes readable, SWSSZmqConsumerStateTable_readData must be used to +// reset the fd and read data into internal data structures. +uint32_t SWSSZmqConsumerStateTable_getFd(SWSSZmqConsumerStateTable tbl); + +// Block until data is available to read or until a timeout elapses. +// A timeout of 0 means the call will return immediately. +SWSSSelectResult SWSSZmqConsumerStateTable_readData(SWSSZmqConsumerStateTable tbl, + uint32_t timeout_ms, + uint8_t interrupt_on_signal); + +const struct SWSSDBConnectorOpaque * +SWSSZmqConsumerStateTable_getDbConnector(SWSSZmqConsumerStateTable tbl); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/zmqproducerstatetable.cpp b/common/c-api/zmqproducerstatetable.cpp new file mode 100644 index 00000000..e1c18680 --- /dev/null +++ b/common/c-api/zmqproducerstatetable.cpp @@ -0,0 +1,32 @@ +#include + +#include "zmqproducerstatetable.h" +#include "../zmqproducerstatetable.h" + +using namespace std; +using namespace swss; +using boost::numeric_cast; + +SWSSZmqProducerStateTable SWSSZmqProducerStateTable_new(SWSSDBConnector db, const char *tableName, + SWSSZmqClient zmqc, uint8_t dbPersistence) { + + SWSSTry(return (SWSSZmqProducerStateTable) new ZmqProducerStateTable( + (DBConnector *)db, string(tableName), *(ZmqClient *)zmqc, dbPersistence)); +} + +void SWSSZmqProducerStateTable_free(SWSSZmqProducerStateTable tbl) { + SWSSTry(delete (ZmqProducerStateTable *)tbl); +} + +void SWSSZmqProducerStateTable_set(SWSSZmqProducerStateTable tbl, const char *key, + SWSSFieldValueArray values) { + SWSSTry(((ZmqProducerStateTable *)tbl)->set(string(key), takeFieldValueArray(values))); +} + +void SWSSZmqProducerStateTable_del(SWSSZmqProducerStateTable tbl, const char *key) { + SWSSTry(((ZmqProducerStateTable *)tbl)->del(string(key))); +} + +uint64_t SWSSZmqProducerStateTable_dbUpdaterQueueSize(SWSSZmqProducerStateTable tbl) { + SWSSTry(return numeric_cast(((ZmqProducerStateTable *)tbl)->dbUpdaterQueueSize())); +} diff --git a/common/c-api/zmqproducerstatetable.h b/common/c-api/zmqproducerstatetable.h new file mode 100644 index 00000000..08d05918 --- /dev/null +++ b/common/c-api/zmqproducerstatetable.h @@ -0,0 +1,32 @@ +#ifndef SWSS_COMMON_C_API_ZMQPRODUCERSTATETABLE_H +#define SWSS_COMMON_C_API_ZMQPRODUCERSTATETABLE_H + +#include "dbconnector.h" +#include "util.h" +#include "zmqclient.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include "stdint.h" + +typedef struct SWSSZmqProducerStateTableOpaque *SWSSZmqProducerStateTable; + +SWSSZmqProducerStateTable SWSSZmqProducerStateTable_new(SWSSDBConnector db, const char *tableName, + SWSSZmqClient zmqc, uint8_t dbPersistence); + +void SWSSZmqProducerStateTable_free(SWSSZmqProducerStateTable tbl); + +void SWSSZmqProducerStateTable_set(SWSSZmqProducerStateTable tbl, const char *key, + SWSSFieldValueArray values); + +void SWSSZmqProducerStateTable_del(SWSSZmqProducerStateTable tbl, const char *key); + +uint64_t SWSSZmqProducerStateTable_dbUpdaterQueueSize(SWSSZmqProducerStateTable tbl); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/zmqserver.cpp b/common/c-api/zmqserver.cpp new file mode 100644 index 00000000..50452e22 --- /dev/null +++ b/common/c-api/zmqserver.cpp @@ -0,0 +1,14 @@ +#include "zmqserver.h" +#include "../zmqserver.h" +#include "util.h" + +using namespace swss; +using namespace std; + +SWSSZmqServer SWSSZmqServer_new(const char *endpoint) { + SWSSTry(return (SWSSZmqServer) new ZmqServer(string(endpoint))); +} + +void SWSSZmqServer_free(SWSSZmqServer zmqs) { + SWSSTry(delete (ZmqServer *)zmqs); +} diff --git a/common/c-api/zmqserver.h b/common/c-api/zmqserver.h new file mode 100644 index 00000000..decd0e0d --- /dev/null +++ b/common/c-api/zmqserver.h @@ -0,0 +1,20 @@ +#ifndef SWSS_COMMON_C_API_ZMQSERVER_H +#define SWSS_COMMON_C_API_ZMQSERVER_H + +#include "util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SWSSZmqServerOpaque *SWSSZmqServer; + +SWSSZmqServer SWSSZmqServer_new(const char *endpoint); + +void SWSSZmqServer_free(SWSSZmqServer zmqs); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/dbconnector.cpp b/common/dbconnector.cpp index 0e044f3e..47fe80d3 100755 --- a/common/dbconnector.cpp +++ b/common/dbconnector.cpp @@ -562,7 +562,7 @@ void RedisContext::initContext(const char *host, int port, const timeval *tv) if (m_conn->err) throw system_error(make_error_code(errc::address_not_available), - "Unable to connect to redis"); + "Unable to connect to redis - " + std::string(m_conn->errstr) + "(" + std::to_string(m_conn->err) + ")"); } void RedisContext::initContext(const char *path, const timeval *tv) @@ -578,7 +578,7 @@ void RedisContext::initContext(const char *path, const timeval *tv) if (m_conn->err) throw system_error(make_error_code(errc::address_not_available), - "Unable to connect to redis (unix-socket)"); + "Unable to connect to redis (unix-socket) - " + std::string(m_conn->errstr) + "(" + std::to_string(m_conn->err) + ")"); } redisContext *RedisContext::getContext() const @@ -645,39 +645,46 @@ DBConnector::DBConnector(int dbId, const RedisContext& ctx) select(this); } +static struct timeval ms_to_timeval(unsigned int ms) { + return { + .tv_sec = (time_t)ms / 1000, + .tv_usec = ((suseconds_t)ms % 1000) * 1000 + }; +} + DBConnector::DBConnector(int dbId, const string& hostname, int port, - unsigned int timeout) + unsigned int timeout_ms) : m_dbId(dbId) { - struct timeval tv = {0, (suseconds_t)timeout * 1000}; - struct timeval *ptv = timeout ? &tv : NULL; + struct timeval tv = ms_to_timeval(timeout_ms); + struct timeval *ptv = timeout_ms ? &tv : NULL; initContext(hostname.c_str(), port, ptv); select(this); } -DBConnector::DBConnector(int dbId, const string& unixPath, unsigned int timeout) +DBConnector::DBConnector(int dbId, const string& unixPath, unsigned int timeout_ms) : m_dbId(dbId) { - struct timeval tv = {0, (suseconds_t)timeout * 1000}; - struct timeval *ptv = timeout ? &tv : NULL; + struct timeval tv = ms_to_timeval(timeout_ms); + struct timeval *ptv = timeout_ms ? &tv : NULL; initContext(unixPath.c_str(), ptv); select(this); } -DBConnector::DBConnector(const string& dbName, unsigned int timeout, bool isTcpConn, const string& netns) - : DBConnector(dbName, timeout, isTcpConn, SonicDBKey(netns)) +DBConnector::DBConnector(const string& dbName, unsigned int timeout_ms, bool isTcpConn, const string& netns) + : DBConnector(dbName, timeout_ms, isTcpConn, SonicDBKey(netns)) { } -DBConnector::DBConnector(const string& dbName, unsigned int timeout, bool isTcpConn, const SonicDBKey &key) +DBConnector::DBConnector(const string& dbName, unsigned int timeout_ms, bool isTcpConn, const SonicDBKey &key) : m_dbId(SonicDBConfig::getDbId(dbName, key)) , m_dbName(dbName) , m_key(key) { - struct timeval tv = {0, (suseconds_t)timeout * 1000}; - struct timeval *ptv = timeout ? &tv : NULL; + struct timeval tv = ms_to_timeval(timeout_ms); + struct timeval *ptv = timeout_ms ? &tv : NULL; if (isTcpConn) { initContext(SonicDBConfig::getDbHostname(dbName, m_key).c_str(), SonicDBConfig::getDbPort(dbName, m_key), ptv); @@ -690,8 +697,8 @@ DBConnector::DBConnector(const string& dbName, unsigned int timeout, bool isTcpC select(this); } -DBConnector::DBConnector(const string& dbName, unsigned int timeout, bool isTcpConn) - : DBConnector(dbName, timeout, isTcpConn, SonicDBKey()) +DBConnector::DBConnector(const string& dbName, unsigned int timeout_ms, bool isTcpConn) + : DBConnector(dbName, timeout_ms, isTcpConn, SonicDBKey()) { // Empty constructor } diff --git a/common/dbconnector.h b/common/dbconnector.h index c5bd48ad..832983ed 100644 --- a/common/dbconnector.h +++ b/common/dbconnector.h @@ -213,11 +213,11 @@ class DBConnector : public RedisContext */ explicit DBConnector(const DBConnector &other); DBConnector(int dbId, const RedisContext &ctx); - DBConnector(int dbId, const std::string &hostname, int port, unsigned int timeout); - DBConnector(int dbId, const std::string &unixPath, unsigned int timeout); - DBConnector(const std::string &dbName, unsigned int timeout, bool isTcpConn = false); - DBConnector(const std::string &dbName, unsigned int timeout, bool isTcpConn, const std::string &netns); - DBConnector(const std::string &dbName, unsigned int timeout, bool isTcpConn, const SonicDBKey &key); + DBConnector(int dbId, const std::string &hostname, int port, unsigned int timeout_ms); + DBConnector(int dbId, const std::string &unixPath, unsigned int timeout_ms); + DBConnector(const std::string &dbName, unsigned int timeout_ms, bool isTcpConn = false); + DBConnector(const std::string &dbName, unsigned int timeout_ms, bool isTcpConn, const std::string &netns); + DBConnector(const std::string &dbName, unsigned int timeout_ms, bool isTcpConn, const SonicDBKey &key); DBConnector& operator=(const DBConnector&) = delete; int getDbId() const; diff --git a/common/interface.h b/common/interface.h new file mode 100644 index 00000000..320ac883 --- /dev/null +++ b/common/interface.h @@ -0,0 +1,19 @@ +#ifndef __INTERFACE__ +#define __INTERFACE__ + +#include +#include + +namespace swss +{ + +const size_t IFACE_NAME_MAX_LEN = IFNAMSIZ - 1; + +bool isInterfaceNameValid(const std::string &ifaceName) +{ + return !ifaceName.empty() && (ifaceName.length() < IFNAMSIZ); +} + +} + +#endif diff --git a/common/performancetimer.cpp b/common/performancetimer.cpp new file mode 100644 index 00000000..400a55c8 --- /dev/null +++ b/common/performancetimer.cpp @@ -0,0 +1,133 @@ +#include "performancetimer.h" + +#include "logger.h" +#include +#include + +using namespace swss; + +bool PerformanceTimer::m_enable = true; +#define LIMIT 5 +#define INDICATOR "/var/log/syslog_notice_flag" + +PerformanceTimer::PerformanceTimer( + _In_ std::string funcName, + _In_ uint64_t threshold, + _In_ bool verbose): + m_name(funcName), + m_threshold(threshold), + m_verbose(verbose) +{ + reset(); + m_stop = std::chrono::steady_clock::now(); +} + +void PerformanceTimer::reset() +{ + SWSS_LOG_ENTER(); + + m_tasks = 0; + m_calls = 0; + m_busy = 0; + m_idle = 0; + + m_intervals.clear(); + m_gaps.clear(); + m_incs.clear(); +} + +void PerformanceTimer::start() +{ + SWSS_LOG_ENTER(); + + m_start = std::chrono::steady_clock::now(); + // measures the gap between this start() and the last stop() + m_gaps.push_back(std::chrono::duration_cast(m_start-m_stop).count()); +} + +void PerformanceTimer::stop() +{ + SWSS_LOG_ENTER(); + m_stop = std::chrono::steady_clock::now(); +} + +std::string PerformanceTimer::inc(uint64_t count) +{ + SWSS_LOG_ENTER(); + + std::string output = ""; + + m_calls += 1; + + m_tasks += count; + + m_idle += m_gaps.back(); + + uint64_t interval = std::chrono::duration_cast(m_stop - m_start).count(); + + m_busy += interval; + + if (count == 0) { + m_gaps.pop_back(); + m_calls -= 1; + return output; + } + + if (m_incs.size() <= LIMIT) { + m_incs.push_back(count); + m_intervals.push_back(interval/1000000); + } else { + m_gaps.pop_back(); + } + + if (m_tasks >= m_threshold) + { + uint64_t mseconds = m_busy/1000000; + + if (m_enable && mseconds > 0) + { + output = getTimerState(); + std::ifstream indicator(INDICATOR); + if (indicator.good()) { + SWSS_LOG_NOTICE("%s", output.c_str()); + } else { + SWSS_LOG_INFO("%s", output.c_str()); + } + } + + reset(); + } + + return output; +} + +std::string PerformanceTimer::getTimerState() +{ + nlohmann::json data; + data["API"] = m_name; + data["Tasks"] = m_tasks; + data["busy[ms]"] = m_busy/1000000; + data["idle[ms]"] = m_idle; + data["Total[ms]"] = m_busy/1000000 + m_idle; + double ratio = static_cast(m_tasks) / static_cast(m_busy/1000000 + m_idle); + data["RPS[k]"] = std::round(ratio * 10.0) / 10.0f; + if (m_verbose) { + data["m_intervals"] = m_intervals; + data["m_gaps"] = m_gaps; + data["m_incs"] = m_incs; + } + + return data.dump(); +} + +void PerformanceTimer::setTimerName(const std::string& funcName) { + m_name = funcName; +} + +void PerformanceTimer::setTimerThreshold(uint64_t threshold) { + m_threshold = threshold; +} + +void PerformanceTimer::setTimerVerbose(bool verbose) { + m_verbose = verbose; +} diff --git a/common/performancetimer.h b/common/performancetimer.h new file mode 100644 index 00000000..545aeeae --- /dev/null +++ b/common/performancetimer.h @@ -0,0 +1,63 @@ +#pragma once + +#include "sal.h" +#include + +#include +#include +#include +#include +namespace swss +{ + class PerformanceTimer + { + public: + + PerformanceTimer( + _In_ std::string funcName = "", + _In_ uint64_t threshold = 10000, + _In_ bool verbose = false + ); + + ~PerformanceTimer() = default; + + public: + + void start(); + + void stop(); + + std::string inc(uint64_t count = 1); + + void reset(); + + std::string getTimerState(); + + static bool m_enable; + + void setTimerName(const std::string& funcName); + void setTimerThreshold(uint64_t threshold); + void setTimerVerbose(bool verbose); + + private: + + std::string m_name; // records what this timer measures about + uint64_t m_threshold; // reset the timer when the m_tasks reachs m_threshold + bool m_verbose; // decides whether to print in verbose when m_threshold is reached + + std::chrono::time_point m_start; + std::chrono::time_point m_stop; + + /* records how long the timer has idled between last stop and this start */ + std::deque m_gaps; + /* records how long each call takes */ + std::deque m_intervals; + /* records how many tasks each call finishes */ + std::deque m_incs; + + uint64_t m_tasks; // sum of m_incs + uint64_t m_calls; // how many times the timer is used + uint64_t m_busy; // sum of m_intervals + uint64_t m_idle; // sum of m_gaps + }; +} diff --git a/common/producerstatetable.cpp b/common/producerstatetable.cpp index d0db5e2a..c7a35475 100644 --- a/common/producerstatetable.cpp +++ b/common/producerstatetable.cpp @@ -14,39 +14,71 @@ using namespace std; namespace swss { ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName) - : ProducerStateTable(new RedisPipeline(db, 1), tableName, false) + : ProducerStateTable(new RedisPipeline(db, 1), tableName, false, false) { m_pipeowned = true; } ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered) + : ProducerStateTable(pipeline, tableName, buffered, false) {} + +ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered, bool flushPub) : TableBase(tableName, SonicDBConfig::getSeparator(pipeline->getDBConnector())) , TableName_KeySet(tableName) + , m_flushPub(flushPub) , m_buffered(buffered) , m_pipeowned(false) , m_tempViewActive(false) , m_pipe(pipeline) { + reloadRedisScript(); + + string luaClear = + "redis.call('DEL', KEYS[1])\n" + "local keys = redis.call('KEYS', KEYS[2] .. '*')\n" + "for i,k in pairs(keys) do\n" + " redis.call('DEL', k)\n" + "end\n" + "redis.call('DEL', KEYS[3])\n"; + m_shaClear = m_pipe->loadRedisScript(luaClear); + + string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua"); + m_shaApplyView = m_pipe->loadRedisScript(luaApplyView); +} + +ProducerStateTable::~ProducerStateTable() +{ + if (m_pipeowned) + { + delete m_pipe; + } +} + +void ProducerStateTable::reloadRedisScript() +{ + // Set m_flushPub to remove publish from a single lua string and let pipeline do publish once per flush + + // However, if m_buffered is false, follow the original one publish per lua design + // Hence we need to check both m_buffered and m_flushPub, and reload the redis script once setBuffered() changes m_buffered + + /* 1. Inform the pipeline of what channel to publish, when flushPub feature is enabled */ + if (m_buffered && m_flushPub) + m_pipe->addChannel(getChannelName(m_pipe->getDbId())); + + /* 2. Setup lua strings: determine whether to attach luaPub after each lua string */ + // num in luaSet and luaDel means number of elements that were added to the key set, // not including all the elements already present into the set. string luaSet = "local added = redis.call('SADD', KEYS[2], ARGV[2])\n" "for i = 0, #KEYS - 3 do\n" " redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n" - "end\n" - " if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" "end\n"; - m_shaSet = m_pipe->loadRedisScript(luaSet); string luaDel = "local added = redis.call('SADD', KEYS[2], ARGV[2])\n" "redis.call('SADD', KEYS[4], ARGV[2])\n" - "redis.call('DEL', KEYS[3])\n" - "if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" - "end\n"; - m_shaDel = m_pipe->loadRedisScript(luaDel); + "redis.call('DEL', KEYS[3])\n"; string luaBatchedSet = "local added = 0\n" @@ -59,11 +91,7 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta " redis.call('HSET', KEYS[3] .. KEYS[4 + i], attr, val)\n" " end\n" " idx = idx + tonumber(ARGV[idx]) * 2 + 1\n" - "end\n" - "if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" "end\n"; - m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet); string luaBatchedDel = "local added = 0\n" @@ -71,36 +99,31 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta " added = added + redis.call('SADD', KEYS[2], KEYS[5 + i])\n" " redis.call('SADD', KEYS[3], KEYS[5 + i])\n" " redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n" - "end\n" - "if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" "end\n"; - m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel); - string luaClear = - "redis.call('DEL', KEYS[1])\n" - "local keys = redis.call('KEYS', KEYS[2] .. '*')\n" - "for i,k in pairs(keys) do\n" - " redis.call('DEL', k)\n" - "end\n" - "redis.call('DEL', KEYS[3])\n"; - m_shaClear = m_pipe->loadRedisScript(luaClear); - - string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua"); - m_shaApplyView = m_pipe->loadRedisScript(luaApplyView); -} - -ProducerStateTable::~ProducerStateTable() -{ - if (m_pipeowned) + if (!m_flushPub || !m_buffered) { - delete m_pipe; + string luaPub = + "if added > 0 then \n" + " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" + "end\n"; + luaSet += luaPub; + luaDel += luaPub; + luaBatchedSet += luaPub; + luaBatchedDel += luaPub; } + + /* 3. load redis script based on the lua string */ + m_shaSet = m_pipe->loadRedisScript(luaSet); + m_shaDel = m_pipe->loadRedisScript(luaDel); + m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet); + m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel); } void ProducerStateTable::setBuffered(bool buffered) { m_buffered = buffered; + reloadRedisScript(); } void ProducerStateTable::set(const string &key, const vector &values, diff --git a/common/producerstatetable.h b/common/producerstatetable.h index b6fa7868..b00453a5 100644 --- a/common/producerstatetable.h +++ b/common/producerstatetable.h @@ -12,6 +12,7 @@ class ProducerStateTable : public TableBase, public TableName_KeySet public: ProducerStateTable(DBConnector *db, const std::string &tableName); ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false); + ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered, bool flushPub); virtual ~ProducerStateTable(); void setBuffered(bool buffered); @@ -51,6 +52,7 @@ class ProducerStateTable : public TableBase, public TableName_KeySet void apply_temp_view(); private: + bool m_flushPub; // publish per piepeline flush intead of per redis script bool m_buffered; bool m_pipeowned; bool m_tempViewActive; @@ -62,6 +64,8 @@ class ProducerStateTable : public TableBase, public TableName_KeySet std::string m_shaClear; std::string m_shaApplyView; TableDump m_tempViewState; + + void reloadRedisScript(); // redis script may change if m_buffered changes }; } diff --git a/common/redispipeline.h b/common/redispipeline.h index b8efa384..be7561b6 100644 --- a/common/redispipeline.h +++ b/common/redispipeline.h @@ -2,7 +2,10 @@ #include #include +#include #include +#include +#include #include "redisreply.h" #include "rediscommand.h" #include "dbconnector.h" @@ -22,9 +25,11 @@ class RedisPipeline { RedisPipeline(const DBConnector *db, size_t sz = 128) : COMMAND_MAX(sz) , m_remaining(0) + , m_shaPub("") { m_db = db->newConnector(NEWCONNECTOR_TIMEOUT); initializeOwnerTid(); + lastHeartBeat = std::chrono::steady_clock::now(); } ~RedisPipeline() { @@ -113,11 +118,19 @@ class RedisPipeline { void flush() { + lastHeartBeat = std::chrono::steady_clock::now(); + + if (m_remaining == 0) { + return; + } + while(m_remaining) { // Construct an object to use its dtor, so that resource is released RedisReply r(pop()); } + + publish(); } size_t size() @@ -145,12 +158,43 @@ class RedisPipeline { m_ownerTid = gettid(); } + void addChannel(std::string channel) + { + if (m_channels.find(channel) != m_channels.end()) + return; + + m_channels.insert(channel); + m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G');"; + m_shaPub = loadRedisScript(m_luaPub); + } + + int getIdleTime(std::chrono::time_point tcurrent=std::chrono::steady_clock::now()) + { + return static_cast(std::chrono::duration_cast(tcurrent - lastHeartBeat).count()); + } + + void publish() { + if (m_shaPub.empty()) { + return; + } + RedisCommand cmd; + cmd.format( + "EVALSHA %s 0", + m_shaPub.c_str()); + RedisReply r(m_db, cmd); + } + private: DBConnector *m_db; std::queue m_expectedTypes; size_t m_remaining; long int m_ownerTid; + std::string m_luaPub; + std::string m_shaPub; + std::chrono::time_point lastHeartBeat; // marks the timestamp of latest pipeline flush being invoked + std::unordered_set m_channels; + void mayflush() { if (m_remaining >= COMMAND_MAX) diff --git a/common/saiaclschema.cpp b/common/saiaclschema.cpp index 6fd32214..88c6f517 100644 --- a/common/saiaclschema.cpp +++ b/common/saiaclschema.cpp @@ -328,5 +328,32 @@ const ActionSchema &ActionSchemaByName(const std::string &action_name) return lookup->second; } +const ActionSchema& ActionSchemaByNameAndObjectType( + const std::string& action_name, const std::string& object_type) { + static const auto* const kRedirectObjectTypes = + new std::unordered_map({ + {"SAI_OBJECT_TYPE_IPMC_GROUP", + {.format = Format::kHexString, .bitwidth = 16}}, + {"SAI_OBJECT_TYPE_L2MC_GROUP", + {.format = Format::kHexString, .bitwidth = 16}}, + // SAI_OBJECT_TYPE_BRIDGE_PORT + // SAI_OBJECT_TYPE_LAG + // SAI_OBJECT_TYPE_NEXT_HOP + // SAI_OBJECT_TYPE_NEXT_HOP_GROUP + // SAI_OBJECT_TYPE_PORT + // SAI_OBJECT_TYPE_SYSTEM_PORT + }); + + if (action_name == "SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT") { + auto lookup = kRedirectObjectTypes->find(object_type); + if (lookup != kRedirectObjectTypes->end()) { + return lookup->second; + } + } + // If we haven't defined the object type, fall through to the default + // SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT format. + return ActionSchemaByName(action_name); +} + } // namespace acl } // namespace swss diff --git a/common/saiaclschema.h b/common/saiaclschema.h index 156148b1..88e66423 100644 --- a/common/saiaclschema.h +++ b/common/saiaclschema.h @@ -83,6 +83,10 @@ const MatchFieldSchema &MatchFieldSchemaByName(const std::string &match_field_na // Throws std::invalid_argument for unknown actions and actions without schemas. const ActionSchema &ActionSchemaByName(const std::string &action_name); +// Allow further format differentiation based on a SAI object type. +const ActionSchema& ActionSchemaByNameAndObjectType( + const std::string& action_name, const std::string& object_type); + } // namespace acl } // namespace swss diff --git a/common/schema.h b/common/schema.h index 27aecdb0..e99f2ad8 100644 --- a/common/schema.h +++ b/common/schema.h @@ -103,6 +103,8 @@ namespace swss { #define APP_NAPT_POOL_IP_TABLE_NAME "NAPT_POOL_IP_TABLE" #define APP_NAT_DNAT_POOL_TABLE_NAME "NAT_DNAT_POOL_TABLE" +#define APP_VRRP_TABLE_NAME "VRRP_TABLE" + #define APP_STP_VLAN_TABLE_NAME "STP_VLAN_TABLE" #define APP_STP_VLAN_PORT_TABLE_NAME "STP_VLAN_PORT_TABLE" #define APP_STP_VLAN_INSTANCE_TABLE_NAME "STP_VLAN_INSTANCE_TABLE" @@ -156,6 +158,7 @@ namespace swss { #define APP_SRV6_SID_LIST_TABLE_NAME "SRV6_SID_LIST_TABLE" #define APP_SRV6_MY_SID_TABLE_NAME "SRV6_MY_SID_TABLE" +#define APP_PIC_CONTEXT_TABLE_NAME "PIC_CONTEXT_TABLE" #define APP_DASH_VNET_TABLE_NAME "DASH_VNET_TABLE" #define APP_DASH_QOS_TABLE_NAME "DASH_QOS_TABLE" @@ -174,6 +177,8 @@ namespace swss { #define APP_DASH_ROUTE_GROUP_TABLE_NAME "DASH_ROUTE_GROUP_TABLE" #define APP_DASH_TUNNEL_TABLE_NAME "DASH_TUNNEL_TABLE" #define APP_DASH_PA_VALIDATION_TABLE_NAME "DASH_PA_VALIDATION_TABLE" +#define APP_DASH_METER_POLICY_TABLE_NAME "DASH_METER_POLICY_TABLE" +#define APP_DASH_METER_RULE_TABLE_NAME "DASH_METER_RULE_TABLE" #define APP_DASH_ROUTING_APPLIANCE_TABLE_NAME "DASH_ROUTING_APPLIANCE_TABLE" #define APP_PAC_PORT_TABLE_NAME "PAC_PORT_TABLE" @@ -258,6 +263,7 @@ namespace swss { #define QUEUE_ATTR_ID_LIST "QUEUE_ATTR_ID_LIST" #define BUFFER_POOL_COUNTER_ID_LIST "BUFFER_POOL_COUNTER_ID_LIST" #define ENI_COUNTER_ID_LIST "ENI_COUNTER_ID_LIST" +#define DASH_METER_COUNTER_ID_LIST "DASH_METER_COUNTER_ID_LIST" #define PFC_WD_STATE_TABLE "PFC_WD_STATE_TABLE" #define PFC_WD_PORT_COUNTER_ID_LIST "PORT_COUNTER_ID_LIST" #define PFC_WD_QUEUE_COUNTER_ID_LIST "QUEUE_COUNTER_ID_LIST" @@ -423,7 +429,8 @@ namespace swss { #define CFG_MCLAG_UNIQUE_IP_TABLE_NAME "MCLAG_UNIQUE_IP" #define CFG_PORT_STORM_CONTROL_TABLE_NAME "PORT_STORM_CONTROL" - +#define CFG_VRRP_TABLE_NAME "VRRP" +#define CFG_VRRP6_TABLE_NAME "VRRP6" #define CFG_RATES_TABLE_NAME "RATES" #define CFG_FEATURE_TABLE_NAME "FEATURE" @@ -457,6 +464,7 @@ namespace swss { #define CFG_TWAMP_SESSION_TABLE_NAME "TWAMP_SESSION" #define CFG_BANNER_MESSAGE_TABLE_NAME "BANNER_MESSAGE" +#define CFG_LOGGING_TABLE_NAME "LOGGING" #define CFG_DHCP_TABLE "DHCP_RELAY" @@ -478,6 +486,7 @@ namespace swss { #define STATE_ACL_STAGE_CAPABILITY_TABLE_NAME "ACL_STAGE_CAPABILITY_TABLE" #define STATE_PBH_CAPABILITIES_TABLE_NAME "PBH_CAPABILITIES" #define STATE_PORT_TABLE_NAME "PORT_TABLE" +#define STATE_PORT_OPER_ERR_TABLE_NAME "PORT_OPERR_TABLE" #define STATE_LAG_TABLE_NAME "LAG_TABLE" #define STATE_VLAN_TABLE_NAME "VLAN_TABLE" #define STATE_VLAN_MEMBER_TABLE_NAME "VLAN_MEMBER_TABLE" diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index 3ed5bcf7..d112cc55 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -51,6 +51,7 @@ void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf) m_context = nullptr; m_socket = nullptr; m_vrf = vrf; + m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT); connect(); } @@ -116,12 +117,11 @@ void ZmqClient::connect() void ZmqClient::sendMsg( const std::string& dbName, const std::string& tableName, - const std::vector& kcos, - std::vector& sendbuffer) + const std::vector& kcos) { int serializedlen = (int)BinarySerializer::serializeBuffer( - sendbuffer.data(), - sendbuffer.size(), + m_sendbuffer.data(), + m_sendbuffer.size(), dbName, tableName, kcos); @@ -144,7 +144,7 @@ void ZmqClient::sendMsg( std::lock_guard lock(m_socketMutex); // Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send - rc = zmq_send(m_socket, sendbuffer.data(), serializedlen, ZMQ_NOBLOCK); + rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK); } if (rc >= 0) @@ -198,65 +198,34 @@ void ZmqClient::sendMsg( } bool ZmqClient::wait(std::string& dbName, - std::string& tableName, - - std::vector>& kcos, - - std::vector& buffer) - + std::vector>& kcos) { - SWSS_LOG_ENTER(); - int rc; - for (int i = 0; true ; ++i) - { - - rc = zmq_recv(m_socket, buffer.data(), buffer.size(), 0); - + rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0); if (rc < 0) - { - if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY) - { - continue; - } - SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); - } - - if (rc >= (int)buffer.size()) - + if (rc >= (int)m_sendbuffer.size()) { - SWSS_LOG_THROW( - "zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", - - (int)buffer.size(), rc); - + (int)m_sendbuffer.size(), rc); } - break; - } - - buffer.at(rc) = 0; // make sure that we end string with zero before parse - + m_sendbuffer.at(rc) = 0; // make sure that we end string with zero before parse kcos.clear(); - - BinarySerializer::deserializeBuffer(buffer.data(), buffer.size(), dbName, tableName, kcos); - + BinarySerializer::deserializeBuffer(m_sendbuffer.data(), m_sendbuffer.size(), dbName, tableName, kcos); return true; - } } diff --git a/common/zmqclient.h b/common/zmqclient.h index 79b4d766..349d6222 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -23,16 +23,11 @@ class ZmqClient void sendMsg(const std::string& dbName, const std::string& tableName, - const std::vector& kcos, - std::vector& sendbuffer); + const std::vector& kcos); bool wait(std::string& dbName, - std::string& tableName, - - std::vector>& kcos, - - std::vector& buffer); + std::vector>& kcos); private: void initialize(const std::string& endpoint, const std::string& vrf); @@ -48,6 +43,8 @@ class ZmqClient bool m_connected; std::mutex m_socketMutex; + + std::vector m_sendbuffer; }; } diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index c171163f..6c9866be 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -38,8 +38,6 @@ ZmqProducerStateTable::ZmqProducerStateTable(RedisPipeline *pipeline, const stri void ZmqProducerStateTable::initialize(DBConnector *db, const std::string &tableName, bool dbPersistence) { - m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT); - if (dbPersistence) { SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str()); @@ -64,8 +62,7 @@ void ZmqProducerStateTable::set( m_zmqClient.sendMsg( m_dbName, m_tableNameStr, - kcos, - m_sendbuffer); + kcos); if (m_asyncDBUpdater != nullptr) { @@ -93,8 +90,7 @@ void ZmqProducerStateTable::del( m_zmqClient.sendMsg( m_dbName, m_tableNameStr, - kcos, - m_sendbuffer); + kcos); if (m_asyncDBUpdater != nullptr) { @@ -112,8 +108,7 @@ void ZmqProducerStateTable::set(const std::vector &value m_zmqClient.sendMsg( m_dbName, m_tableNameStr, - values, - m_sendbuffer); + values); if (m_asyncDBUpdater != nullptr) { @@ -136,8 +131,7 @@ void ZmqProducerStateTable::del(const std::vector &keys) m_zmqClient.sendMsg( m_dbName, m_tableNameStr, - kcos, - m_sendbuffer); + kcos); if (m_asyncDBUpdater != nullptr) { @@ -157,8 +151,7 @@ void ZmqProducerStateTable::send(const std::vector &kcos m_zmqClient.sendMsg( m_dbName, m_tableNameStr, - kcos, - m_sendbuffer); + kcos); if (m_asyncDBUpdater != nullptr) { @@ -172,15 +165,10 @@ void ZmqProducerStateTable::send(const std::vector &kcos } bool ZmqProducerStateTable::wait(std::string& dbName, - std::string& tableName, - std::vector>& kcos) - { - - return m_zmqClient.wait(dbName, tableName, kcos, m_sendbuffer); - + return m_zmqClient.wait(dbName, tableName, kcos); } size_t ZmqProducerStateTable::dbUpdaterQueueSize() diff --git a/common/zmqproducerstatetable.h b/common/zmqproducerstatetable.h index 3c794237..7ee54cc0 100644 --- a/common/zmqproducerstatetable.h +++ b/common/zmqproducerstatetable.h @@ -37,12 +37,8 @@ class ZmqProducerStateTable : public ProducerStateTable // Batched send that can include both SET and DEL requests. virtual void send(const std::vector &kcos); - // This method should only be used if the ZmqClient enables one-to-one sync. - virtual bool wait(std::string& dbName, - std::string& tableName, - std::vector>& kcos); size_t dbUpdaterQueueSize(); @@ -50,8 +46,6 @@ class ZmqProducerStateTable : public ProducerStateTable void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence); ZmqClient& m_zmqClient; - - std::vector m_sendbuffer; const std::string m_dbName; const std::string m_tableNameStr; diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index dbca01df..1db94eb0 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -18,7 +18,8 @@ ZmqServer::ZmqServer(const std::string& endpoint) ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) : m_endpoint(endpoint), - m_vrf(vrf), m_allowZmqPoll(true) + m_vrf(vrf), + m_allowZmqPoll(true) { m_buffer.resize(MQ_RESPONSE_MAX_COUNT); m_runThread = true; @@ -32,11 +33,6 @@ ZmqServer::~ZmqServer() m_allowZmqPoll = true; m_runThread = false; m_mqPollThread->join(); - - zmq_close(m_socket); - - zmq_ctx_destroy(m_context); - } void ZmqServer::registerMessageHandler( diff --git a/common/zmqserver.h b/common/zmqserver.h index d5b36d39..59aef344 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -59,10 +59,6 @@ class ZmqServer std::string m_vrf; - void* m_context; - - void* m_socket; - bool m_allowZmqPoll; std::map> m_HandlerMap; diff --git a/debian/libswsscommon-dev.install b/debian/libswsscommon-dev.install index 1dd2670e..85e3c4bc 100644 --- a/debian/libswsscommon-dev.install +++ b/debian/libswsscommon-dev.install @@ -1,2 +1,3 @@ common/*.h usr/include/swss +common/c-api/*.h usr/include/swss/c-api pyext/swsscommon.i usr/share/swss diff --git a/goext/Makefile b/goext/Makefile index d46d9908..76fddb7f 100644 --- a/goext/Makefile +++ b/goext/Makefile @@ -17,7 +17,7 @@ all: $(SWIG) $(SWIG_FLAG) -I/usr/include/swss/ swsscommon.i check: - sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) build + $(GO) mod init goext sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test clean: diff --git a/pyext/swsscommon.i b/pyext/swsscommon.i index 2bf953b1..b3d015e0 100644 --- a/pyext/swsscommon.i +++ b/pyext/swsscommon.i @@ -58,6 +58,7 @@ #include "zmqproducerstatetable.h" #include #include +#include "interface.h" %} %include @@ -282,6 +283,7 @@ T castSelectableObj(swss::Selectable *temp) %include "zmqserver.h" %include "zmqclient.h" %include "zmqconsumerstatetable.h" +%include "interface.h" %extend swss::DBConnector { %template(hgetall) hgetall>; @@ -296,7 +298,7 @@ T castSelectableObj(swss::Selectable *temp) %include "table.h" #ifdef ENABLE_YANG_MODULES %include "decoratortable.h" -#endif +#endif %clear std::vector &keys; %clear std::vector &ops; %clear std::vector>> &fvss; diff --git a/tests/Makefile.am b/tests/Makefile.am index a07fadc2..9642b09a 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -42,6 +42,8 @@ tests_tests_SOURCES = tests/redis_ut.cpp \ tests/binary_serializer_ut.cpp \ tests/zmq_state_ut.cpp \ tests/profileprovider_ut.cpp \ + tests/c_api_ut.cpp \ + tests/performancetimer_ut.cpp \ tests/main.cpp tests_tests_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(LIBNL_CFLAGS) diff --git a/tests/c_api_ut.cpp b/tests/c_api_ut.cpp new file mode 100644 index 00000000..ed814607 --- /dev/null +++ b/tests/c_api_ut.cpp @@ -0,0 +1,335 @@ +#include +#include + +#include "common/c-api/consumerstatetable.h" +#include "common/c-api/dbconnector.h" +#include "common/c-api/producerstatetable.h" +#include "common/c-api/subscriberstatetable.h" +#include "common/c-api/util.h" +#include "common/c-api/zmqclient.h" +#include "common/c-api/zmqconsumerstatetable.h" +#include "common/c-api/zmqproducerstatetable.h" +#include "common/c-api/zmqserver.h" +#include "common/select.h" +#include "common/subscriberstatetable.h" +#include "gtest/gtest.h" + +using namespace std; +using namespace swss; + +static void clearDB() { + DBConnector db("TEST_DB", 0, true); + RedisReply r(&db, "FLUSHALL", REDIS_REPLY_STATUS); + r.checkStatusOK(); +} + +static void sortKfvs(vector &kfvs) { + sort(kfvs.begin(), kfvs.end(), + [](const KeyOpFieldsValuesTuple &a, const KeyOpFieldsValuesTuple &b) { + return kfvKey(a) < kfvKey(b); + }); + + for (auto &kfv : kfvs) { + auto &fvs = kfvFieldsValues(kfv); + sort(fvs.begin(), fvs.end(), + [](const pair &a, const pair &b) { + return a.first < b.first; + }); + } +} + +#define free(x) std::free(const_cast(reinterpret_cast(x))); + +static void freeKeyOpFieldValuesArray(SWSSKeyOpFieldValuesArray arr) { + for (uint64_t i = 0; i < arr.len; i++) { + free(arr.data[i].key); + for (uint64_t j = 0; j < arr.data[i].fieldValues.len; j++) { + free(arr.data[i].fieldValues.data[j].field); + SWSSString_free(arr.data[i].fieldValues.data[j].value); + } + SWSSFieldValueArray_free(arr.data[i].fieldValues); + } + SWSSKeyOpFieldValuesArray_free(arr); +} + +struct SWSSStringManager { + vector m_strings; + + SWSSString makeString(const char *c_str) { + SWSSString s = SWSSString_new_c_str(c_str); + m_strings.push_back(s); + return s; + } + + SWSSStrRef makeStrRef(const char *c_str) { + return (SWSSStrRef)makeString(c_str); + } + + ~SWSSStringManager() { + for (SWSSString s : m_strings) + SWSSString_free(s); + } +}; + +TEST(c_api, DBConnector) { + clearDB(); + SWSSStringManager sm; + + EXPECT_THROW(SWSSDBConnector_new_named("does not exist", 0, true), out_of_range); + SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); + EXPECT_EQ(SWSSDBConnector_get(db, "mykey"), nullptr); + EXPECT_FALSE(SWSSDBConnector_exists(db, "mykey")); + + SWSSDBConnector_set(db, "mykey", sm.makeStrRef("myval")); + SWSSString val = SWSSDBConnector_get(db, "mykey"); + EXPECT_STREQ(SWSSStrRef_c_str((SWSSStrRef)val), "myval"); + SWSSString_free(val); + EXPECT_TRUE(SWSSDBConnector_exists(db, "mykey")); + EXPECT_TRUE(SWSSDBConnector_del(db, "mykey")); + EXPECT_FALSE(SWSSDBConnector_del(db, "mykey")); + + EXPECT_FALSE(SWSSDBConnector_hget(db, "mykey", "myfield")); + EXPECT_FALSE(SWSSDBConnector_hexists(db, "mykey", "myfield")); + SWSSDBConnector_hset(db, "mykey", "myfield", sm.makeStrRef("myval")); + val = SWSSDBConnector_hget(db, "mykey", "myfield"); + EXPECT_STREQ(SWSSStrRef_c_str((SWSSStrRef)val), "myval"); + SWSSString_free(val); + + EXPECT_TRUE(SWSSDBConnector_hexists(db, "mykey", "myfield")); + EXPECT_FALSE(SWSSDBConnector_hget(db, "mykey", "notmyfield")); + EXPECT_FALSE(SWSSDBConnector_hexists(db, "mykey", "notmyfield")); + EXPECT_TRUE(SWSSDBConnector_hdel(db, "mykey", "myfield")); + EXPECT_FALSE(SWSSDBConnector_hdel(db, "mykey", "myfield")); + + EXPECT_TRUE(SWSSDBConnector_flushdb(db)); + SWSSDBConnector_free(db); +} + +TEST(c_api, ConsumerProducerStateTables) { + clearDB(); + SWSSStringManager sm; + + SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); + SWSSProducerStateTable pst = SWSSProducerStateTable_new(db, "mytable"); + SWSSConsumerStateTable cst = SWSSConsumerStateTable_new(db, "mytable", nullptr, nullptr); + + SWSSConsumerStateTable_getFd(cst); + + SWSSKeyOpFieldValuesArray arr = SWSSConsumerStateTable_pops(cst); + ASSERT_EQ(arr.len, 0); + freeKeyOpFieldValuesArray(arr); + + SWSSFieldValueTuple data[2] = { + {.field = "myfield1", .value = sm.makeString("myvalue1")}, + {.field = "myfield2", .value = sm.makeString("myvalue2")}}; + SWSSFieldValueArray values = { + .len = 2, + .data = data, + }; + SWSSProducerStateTable_set(pst, "mykey1", values); + + data[0] = {.field = "myfield3", .value = sm.makeString("myvalue3")}; + values.len = 1; + SWSSProducerStateTable_set(pst, "mykey2", values); + + ASSERT_EQ(SWSSConsumerStateTable_readData(cst, 300, true), SWSSSelectResult_DATA); + arr = SWSSConsumerStateTable_pops(cst); + vector kfvs = takeKeyOpFieldValuesArray(arr); + sortKfvs(kfvs); + freeKeyOpFieldValuesArray(arr); + + ASSERT_EQ(kfvs.size(), 2); + EXPECT_EQ(kfvKey(kfvs[0]), "mykey1"); + EXPECT_EQ(kfvOp(kfvs[0]), "SET"); + vector> &fieldValues0 = kfvFieldsValues(kfvs[0]); + ASSERT_EQ(fieldValues0.size(), 2); + EXPECT_EQ(fieldValues0[0].first, "myfield1"); + EXPECT_EQ(fieldValues0[0].second, "myvalue1"); + EXPECT_EQ(fieldValues0[1].first, "myfield2"); + EXPECT_EQ(fieldValues0[1].second, "myvalue2"); + + EXPECT_EQ(kfvKey(kfvs[1]), "mykey2"); + EXPECT_EQ(kfvOp(kfvs[1]), "SET"); + vector> &fieldValues1 = kfvFieldsValues(kfvs[1]); + ASSERT_EQ(fieldValues1.size(), 1); + EXPECT_EQ(fieldValues1[0].first, "myfield3"); + EXPECT_EQ(fieldValues1[0].second, "myvalue3"); + + arr = SWSSConsumerStateTable_pops(cst); + EXPECT_EQ(arr.len, 0); + freeKeyOpFieldValuesArray(arr); + + SWSSProducerStateTable_del(pst, "mykey3"); + SWSSProducerStateTable_del(pst, "mykey4"); + SWSSProducerStateTable_del(pst, "mykey5"); + + arr = SWSSConsumerStateTable_pops(cst); + kfvs = takeKeyOpFieldValuesArray(arr); + sortKfvs(kfvs); + freeKeyOpFieldValuesArray(arr); + + ASSERT_EQ(kfvs.size(), 3); + EXPECT_EQ(kfvKey(kfvs[0]), "mykey3"); + EXPECT_EQ(kfvOp(kfvs[0]), "DEL"); + EXPECT_EQ(kfvFieldsValues(kfvs[0]).size(), 0); + EXPECT_EQ(kfvKey(kfvs[1]), "mykey4"); + EXPECT_EQ(kfvOp(kfvs[1]), "DEL"); + EXPECT_EQ(kfvFieldsValues(kfvs[1]).size(), 0); + EXPECT_EQ(kfvKey(kfvs[2]), "mykey5"); + EXPECT_EQ(kfvOp(kfvs[2]), "DEL"); + EXPECT_EQ(kfvFieldsValues(kfvs[2]).size(), 0); + + SWSSProducerStateTable_free(pst); + SWSSConsumerStateTable_free(cst); + SWSSDBConnector_flushdb(db); + SWSSDBConnector_free(db); +} + +TEST(c_api, SubscriberStateTable) { + clearDB(); + SWSSStringManager sm; + + SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); + SWSSSubscriberStateTable sst = SWSSSubscriberStateTable_new(db, "mytable", nullptr, nullptr); + + SWSSSubscriberStateTable_getFd(sst); + + EXPECT_EQ(SWSSSubscriberStateTable_readData(sst, 300, true), SWSSSelectResult_TIMEOUT); + SWSSKeyOpFieldValuesArray arr = SWSSSubscriberStateTable_pops(sst); + EXPECT_EQ(arr.len, 0); + freeKeyOpFieldValuesArray(arr); + + SWSSDBConnector_hset(db, "mytable:mykey", "myfield", sm.makeStrRef("myvalue")); + EXPECT_EQ(SWSSSubscriberStateTable_readData(sst, 300, true), SWSSSelectResult_DATA); + arr = SWSSSubscriberStateTable_pops(sst); + vector kfvs = takeKeyOpFieldValuesArray(arr); + sortKfvs(kfvs); + freeKeyOpFieldValuesArray(arr); + + ASSERT_EQ(kfvs.size(), 1); + EXPECT_EQ(kfvKey(kfvs[0]), "mykey"); + EXPECT_EQ(kfvOp(kfvs[0]), "SET"); + ASSERT_EQ(kfvFieldsValues(kfvs[0]).size(), 1); + EXPECT_EQ(kfvFieldsValues(kfvs[0])[0].first, "myfield"); + EXPECT_EQ(kfvFieldsValues(kfvs[0])[0].second, "myvalue"); + + SWSSSubscriberStateTable_free(sst); + SWSSDBConnector_flushdb(db); + SWSSDBConnector_free(db); +} + +TEST(c_api, ZmqConsumerProducerStateTable) { + clearDB(); + SWSSStringManager sm; + + SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); + + SWSSZmqServer srv = SWSSZmqServer_new("tcp://127.0.0.1:42312"); + SWSSZmqClient cli = SWSSZmqClient_new("tcp://127.0.0.1:42312"); + EXPECT_TRUE(SWSSZmqClient_isConnected(cli)); + SWSSZmqClient_connect(cli); // This should be idempotent/not throw + + SWSSZmqProducerStateTable pst = SWSSZmqProducerStateTable_new(db, "mytable", cli, false); + SWSSZmqConsumerStateTable cst = + SWSSZmqConsumerStateTable_new(db, "mytable", srv, nullptr, nullptr); + + SWSSZmqConsumerStateTable_getFd(cst); + + ASSERT_EQ(SWSSZmqConsumerStateTable_getDbConnector(cst), db); + + SWSSKeyOpFieldValuesArray arr = SWSSZmqConsumerStateTable_pops(cst); + ASSERT_EQ(arr.len, 0); + freeKeyOpFieldValuesArray(arr); + + // On flag = 0, we use the ZmqProducerStateTable + // On flag = 1, we use the ZmqClient directly + for (int flag = 0; flag < 2; flag++) { + SWSSFieldValueTuple values_key1_data[2] = {{.field = "myfield1", .value = sm.makeString("myvalue1")}, + {.field = "myfield2", .value = sm.makeString("myvalue2")}}; + SWSSFieldValueArray values_key1 = { + .len = 2, + .data = values_key1_data, + }; + + SWSSFieldValueTuple values_key2_data[1] = {{.field = "myfield3", .value = sm.makeString("myvalue3")}}; + SWSSFieldValueArray values_key2 = { + .len = 1, + .data = values_key2_data, + }; + + SWSSKeyOpFieldValues arr_data[2] = { + {.key = "mykey1", .operation = SWSSKeyOperation_SET, .fieldValues = values_key1}, + {.key = "mykey2", .operation = SWSSKeyOperation_SET, .fieldValues = values_key2}}; + arr = {.len = 2, .data = arr_data}; + + if (flag == 0) + for (uint64_t i = 0; i < arr.len; i++) + SWSSZmqProducerStateTable_set(pst, arr.data[i].key, arr.data[i].fieldValues); + else + SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr); + + ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 1500, true), SWSSSelectResult_DATA); + arr = SWSSZmqConsumerStateTable_pops(cst); + + vector kfvs = takeKeyOpFieldValuesArray(arr); + sortKfvs(kfvs); + freeKeyOpFieldValuesArray(arr); + + ASSERT_EQ(kfvs.size(), 2); + EXPECT_EQ(kfvKey(kfvs[0]), "mykey1"); + EXPECT_EQ(kfvOp(kfvs[0]), "SET"); + vector> &fieldValues0 = kfvFieldsValues(kfvs[0]); + ASSERT_EQ(fieldValues0.size(), 2); + EXPECT_EQ(fieldValues0[0].first, "myfield1"); + EXPECT_EQ(fieldValues0[0].second, "myvalue1"); + EXPECT_EQ(fieldValues0[1].first, "myfield2"); + EXPECT_EQ(fieldValues0[1].second, "myvalue2"); + + EXPECT_EQ(kfvKey(kfvs[1]), "mykey2"); + EXPECT_EQ(kfvOp(kfvs[1]), "SET"); + vector> &fieldValues1 = kfvFieldsValues(kfvs[1]); + ASSERT_EQ(fieldValues1.size(), 1); + EXPECT_EQ(fieldValues1[0].first, "myfield3"); + EXPECT_EQ(fieldValues1[0].second, "myvalue3"); + + arr = SWSSZmqConsumerStateTable_pops(cst); + ASSERT_EQ(arr.len, 0); + freeKeyOpFieldValuesArray(arr); + + arr_data[0] = {.key = "mykey3", .operation = SWSSKeyOperation_DEL, .fieldValues = {}}; + arr_data[1] = {.key = "mykey4", .operation = SWSSKeyOperation_DEL, .fieldValues = {}}; + arr = {.len = 2, .data = arr_data}; + + if (flag == 0) + for (uint64_t i = 0; i < arr.len; i++) + SWSSZmqProducerStateTable_del(pst, arr.data[i].key); + else + SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr); + + ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500, true), SWSSSelectResult_DATA); + arr = SWSSZmqConsumerStateTable_pops(cst); + + kfvs = takeKeyOpFieldValuesArray(arr); + sortKfvs(kfvs); + freeKeyOpFieldValuesArray(arr); + + ASSERT_EQ(kfvs.size(), 2); + EXPECT_EQ(kfvKey(kfvs[0]), "mykey3"); + EXPECT_EQ(kfvOp(kfvs[0]), "DEL"); + EXPECT_EQ(kfvFieldsValues(kfvs[0]).size(), 0); + EXPECT_EQ(kfvKey(kfvs[1]), "mykey4"); + EXPECT_EQ(kfvOp(kfvs[1]), "DEL"); + EXPECT_EQ(kfvFieldsValues(kfvs[1]).size(), 0); + } + + // Server must be freed first to safely release message handlers (ZmqConsumerStateTable) + SWSSZmqServer_free(srv); + + SWSSZmqProducerStateTable_free(pst); + SWSSZmqConsumerStateTable_free(cst); + + SWSSZmqClient_free(cli); + + SWSSDBConnector_flushdb(db); + SWSSDBConnector_free(db); +} diff --git a/tests/main.cpp b/tests/main.cpp index 6cbaf251..440978a4 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -1,5 +1,6 @@ #include "gtest/gtest.h" #include "common/dbconnector.h" +#include "common/c-api/util.h" #include using namespace std; @@ -84,6 +85,9 @@ class SwsscommonEnvironment : public ::testing::Environment { SonicDBConfig::initializeGlobalConfig(global_existing_file); cout<<"INIT: load global db config file, isInit = "< +#include "gtest/gtest.h" +#include + +using namespace std; + +#define PRINT_ALL 1 + +TEST(PerformancetimerTest, basic) +{ + std::string expected; + + static swss::PerformanceTimer timer("basic", PRINT_ALL); + timer.start(); + this_thread::sleep_for(chrono::milliseconds(100)); + timer.stop(); + std::string output = timer.inc(1000); + + expected = R"({"API":"basic","RPS[k]":10.0,"Tasks":1000,"Total[ms]":100,"busy[ms]":100,"idle[ms]":0})"; + EXPECT_EQ(output, expected); + + timer.setTimerName("basic_set_name"); + timer.setTimerVerbose(true); + timer.setTimerThreshold(3000); + + timer.start(); + this_thread::sleep_for(chrono::milliseconds(100)); + timer.stop(); + output = timer.inc(1000); + EXPECT_EQ(output, ""); + + this_thread::sleep_for(chrono::milliseconds(200)); + + timer.start(); + this_thread::sleep_for(chrono::milliseconds(300)); + timer.stop(); + output = timer.inc(2000); + + expected = R"({"API":"basic_set_name","RPS[k]":5.0,"Tasks":3000,"Total[ms]":600,"busy[ms]":400,"idle[ms]":200,"m_gaps":[0,200],"m_incs":[1000,2000],"m_intervals":[100,300]})"; + + EXPECT_EQ(output, expected); +} diff --git a/tests/redis_piped_state_ut.cpp b/tests/redis_piped_state_ut.cpp index ca329190..f3173876 100644 --- a/tests/redis_piped_state_ut.cpp +++ b/tests/redis_piped_state_ut.cpp @@ -730,3 +730,59 @@ TEST(ConsumerStateTable, async_multitable) cout << endl << "Done." << endl; } + +TEST(ConsumerStateTable, flushPub) +{ + clearDB(); + + /* Prepare producer */ + int index = 0; + string tableName = "UT_REDIS_THREAD_" + to_string(index); + DBConnector db(TEST_DB, 0, true); + RedisPipeline pipeline(&db); + ProducerStateTable p(&pipeline, tableName, false, true); + p.setBuffered(true); + + string key = "TheKey"; + int maxNumOfFields = 2; + + /* Set operation */ + { + vector fields; + for (int j = 0; j < maxNumOfFields; j++) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key, fields); + } + + /* Del operation */ + p.del(key); + p.flush(); + + /* Prepare consumer */ + ConsumerStateTable c(&db, tableName); + Select cs; + Selectable *selectcs; + cs.addSelectable(&c); + + /* First pop operation */ + { + int ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + KeyOpFieldsValuesTuple kco; + c.pop(kco); + EXPECT_EQ(kfvKey(kco), key); + EXPECT_EQ(kfvOp(kco), "DEL"); + + auto fvs = kfvFieldsValues(kco); + EXPECT_EQ(fvs.size(), 0U); + } + + /* Second select operation */ + { + int ret = cs.select(&selectcs, 1000); + EXPECT_EQ(ret, Select::TIMEOUT); + } +} \ No newline at end of file diff --git a/tests/redis_ut.cpp b/tests/redis_ut.cpp index 4f691e88..f53f891d 100644 --- a/tests/redis_ut.cpp +++ b/tests/redis_ut.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include "gtest/gtest.h" #include "common/dbconnector.h" #include "common/producertable.h" @@ -20,6 +22,7 @@ using namespace std; using namespace swss; +using namespace testing; #define NUMBER_OF_THREADS (64) // Spawning more than 256 threads causes libc++ to except #define NUMBER_OF_OPS (1000) @@ -1139,3 +1142,32 @@ TEST(Connector, hmset) // test empty multi hash db.hmset({}); } + +TEST(Connector, connectFail) +{ + // connect to an ip which is not a redis server + EXPECT_THROW({ + try + { + DBConnector db(0, "1.1.1.1", 6379, 1); + } + catch(const std::system_error& e) + { + EXPECT_THAT(e.what(), HasSubstr("Unable to connect to redis - ")); + throw; + } + }, std::system_error); + + // connect to an invalid unix socket address + EXPECT_THROW({ + try + { + DBConnector db(0, "/tmp/invalid", 1); + } + catch(const std::system_error& e) + { + EXPECT_THAT(e.what(), HasSubstr("Unable to connect to redis (unix-socket) - ")); + throw; + } + }, std::system_error); +} diff --git a/tests/saiaclschema_ut.cpp b/tests/saiaclschema_ut.cpp index fff9158d..1f828f77 100644 --- a/tests/saiaclschema_ut.cpp +++ b/tests/saiaclschema_ut.cpp @@ -60,6 +60,37 @@ TEST(SaiAclSchemaTest, ActionSchemaByNameSucceeds) AllOf(Field(&ActionSchema::format, Format::kHexString), Field(&ActionSchema::bitwidth, 12))); } +TEST(SaiAclSchemaTest, ActionSchemaByNameAndObjectTypeSucceeds) { + EXPECT_THAT( + ActionSchemaByNameAndObjectType("SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT", + "SAI_OBJECT_TYPE_IPMC_GROUP"), + AllOf(Field(&ActionSchema::format, Format::kHexString), + Field(&ActionSchema::bitwidth, 16))); + EXPECT_THAT( + ActionSchemaByNameAndObjectType("SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT", + "SAI_OBJECT_TYPE_L2MC_GROUP"), + AllOf(Field(&ActionSchema::format, Format::kHexString), + Field(&ActionSchema::bitwidth, 16))); + EXPECT_THAT( + ActionSchemaByNameAndObjectType("SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT", + "SAI_OBJECT_TYPE_NEXT_HOP"), + AllOf(Field(&ActionSchema::format, Format::kString), + Field(&ActionSchema::bitwidth, 0))); + EXPECT_THAT(ActionSchemaByNameAndObjectType( + "SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT", "SAI_OBJECT_TYPE_PORT"), + AllOf(Field(&ActionSchema::format, Format::kString), + Field(&ActionSchema::bitwidth, 0))); +} + +TEST(SaiAclSchemaTest, + ActionSchemaByNameAndObjectTypeWithNonRedirectActionSucceeds) { + EXPECT_THAT( + ActionSchemaByNameAndObjectType("SAI_ACL_ENTRY_ATTR_ACTION_DECREMENT_TTL", + "SAI_OBJECT_TYPE_UNKNOWN"), + AllOf(Field(&ActionSchema::format, Format::kHexString), + Field(&ActionSchema::bitwidth, 1))); +} + // Invalid Lookup Tests TEST(SaiAclSchemaTest, InvalidFormatNameThrowsException) @@ -82,6 +113,11 @@ TEST(SaiAclSchemaTest, InvalidActionNameThrowsException) EXPECT_THROW(ActionSchemaByName("Foo"), std::invalid_argument); } +TEST(SaiAclSchemaTest, InvalidActionNameAndObjectTypeThrowsException) { + EXPECT_THROW(ActionSchemaByNameAndObjectType("Foo", "unknown"), + std::invalid_argument); +} + } // namespace } // namespace acl } // namespace swss diff --git a/tests/test_interface.py b/tests/test_interface.py new file mode 100644 index 00000000..25c809ce --- /dev/null +++ b/tests/test_interface.py @@ -0,0 +1,8 @@ +from swsscommon import swsscommon + +def test_is_interface_name_valid(): + invalid_interface_name = "TooLongInterfaceName" + assert not swsscommon.isInterfaceNameValid(invalid_interface_name) + + validInterfaceName = "OkInterfaceName" + assert swsscommon.isInterfaceNameValid(validInterfaceName) diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 3817fb52..c4dcc748 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -444,3 +444,30 @@ TEST(ZmqConsumerStateTableBatchBufferOverflow, test) } EXPECT_ANY_THROW(p.send(kcos)); } + +TEST(ZmqProducerStateTableDeleteAfterSend, test) +{ + std::string testTableName = "ZMQ_PROD_DELETE_UT"; + std::string pushEndpoint = "tcp://localhost:1234"; + std::string pullEndpoint = "tcp://*:1234"; + std::string testKey = "testKey"; + + ZmqServer server(pullEndpoint); + + DBConnector db(TEST_DB, 0, true); + ZmqClient client(pushEndpoint); + + auto *p = new ZmqProducerStateTable(&db, testTableName, client, true); + std::vector values; + FieldValueTuple t("test", "test"); + values.push_back(t); + p->set(testKey,values); + delete p; + + sleep(1); + + Table table(&db, testTableName); + std::vector keys; + table.getKeys(keys); + EXPECT_EQ(keys.front(), testKey); +}