From 45cf9e2258057dfcb8bd37e2b2d4c48542deaf95 Mon Sep 17 00:00:00 2001 From: divyagayathri-hcl Date: Tue, 5 Nov 2024 22:13:10 -0800 Subject: [PATCH] zmq wait --- common/zmqclient.cpp | 62 ++++++++++++++++++++++++++++++++ common/zmqclient.h | 10 ++++++ common/zmqproducerstatetable.cpp | 12 +++++++ common/zmqproducerstatetable.h | 8 +++++ common/zmqserver.cpp | 19 +++++++--- common/zmqserver.h | 7 ++++ 6 files changed, 113 insertions(+), 5 deletions(-) diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index 0225d437..3ed5bcf7 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -197,4 +197,66 @@ void ZmqClient::sendMsg( throw system_error(make_error_code(errc::io_error), message); } +bool ZmqClient::wait(std::string& dbName, + + std::string& tableName, + + std::vector>& kcos, + + std::vector& buffer) + +{ + + SWSS_LOG_ENTER(); + + int rc; + + for (int i = 0; true ; ++i) + + { + + rc = zmq_recv(m_socket, buffer.data(), buffer.size(), 0); + + if (rc < 0) + + { + + if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY) + + { + + continue; + + } + + SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); + + } + + if (rc >= (int)buffer.size()) + + { + + SWSS_LOG_THROW( + + "zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", + + (int)buffer.size(), rc); + + } + + break; + + } + + buffer.at(rc) = 0; // make sure that we end string with zero before parse + + kcos.clear(); + + BinarySerializer::deserializeBuffer(buffer.data(), buffer.size(), dbName, tableName, kcos); + + return true; + +} + } diff --git a/common/zmqclient.h b/common/zmqclient.h index 313e6573..79b4d766 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -12,6 +12,7 @@ namespace swss { class ZmqClient { public: + ZmqClient(const std::string& endpoint); ZmqClient(const std::string& endpoint, const std::string& vrf); ~ZmqClient(); @@ -24,6 +25,15 @@ class ZmqClient const std::string& tableName, const std::vector& kcos, std::vector& sendbuffer); + + bool wait(std::string& dbName, + + std::string& tableName, + + std::vector>& kcos, + + std::vector& buffer); + private: void initialize(const std::string& endpoint, const std::string& vrf); diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index ec9396b3..c171163f 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -171,6 +171,18 @@ void ZmqProducerStateTable::send(const std::vector &kcos } } +bool ZmqProducerStateTable::wait(std::string& dbName, + + std::string& tableName, + + std::vector>& kcos) + +{ + + return m_zmqClient.wait(dbName, tableName, kcos, m_sendbuffer); + +} + size_t ZmqProducerStateTable::dbUpdaterQueueSize() { if (m_asyncDBUpdater == nullptr) diff --git a/common/zmqproducerstatetable.h b/common/zmqproducerstatetable.h index 74910782..3c794237 100644 --- a/common/zmqproducerstatetable.h +++ b/common/zmqproducerstatetable.h @@ -37,6 +37,14 @@ class ZmqProducerStateTable : public ProducerStateTable // Batched send that can include both SET and DEL requests. virtual void send(const std::vector &kcos); + // This method should only be used if the ZmqClient enables one-to-one sync. + + virtual bool wait(std::string& dbName, + + std::string& tableName, + + std::vector>& kcos); + size_t dbUpdaterQueueSize(); private: void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence); diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index dca10740..02ef377a 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -18,7 +18,8 @@ ZmqServer::ZmqServer(const std::string& endpoint) ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) : m_endpoint(endpoint), - m_vrf(vrf) + m_vrf(vrf), + m_allowZmqPoll(true) { m_buffer.resize(MQ_RESPONSE_MAX_COUNT); m_runThread = true; @@ -29,8 +30,14 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) ZmqServer::~ZmqServer() { + m_allowZmqPoll = true; m_runThread = false; m_mqPollThread->join(); + + zmq_close(m_socket); + + zmq_ctx_destroy(m_context); + } void ZmqServer::registerMessageHandler( @@ -115,13 +122,15 @@ void ZmqServer::mqPollThread() // zmq_poll will use less CPU zmq_pollitem_t poll_item; poll_item.fd = 0; - poll_item.socket = socket; + poll_item.socket = m_socket; poll_item.events = ZMQ_POLLIN; poll_item.revents = 0; SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str()); while (m_runThread) { + m_allowZmqPoll = false; + // receive message rc = zmq_poll(&poll_item, 1, 1000); if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN)) @@ -132,7 +141,7 @@ void ZmqServer::mqPollThread() } // receive message - rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); + rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); if (rc < 0) { int zmq_err = zmq_errno(); @@ -161,8 +170,8 @@ void ZmqServer::mqPollThread() handleReceivedData(m_buffer.data(), rc); } - zmq_close(socket); - zmq_ctx_destroy(context); + zmq_close(m_socket); + zmq_ctx_destroy(m_context); SWSS_LOG_NOTICE("mqPollThread end"); } diff --git a/common/zmqserver.h b/common/zmqserver.h index 8afe18d7..b8b14c1e 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -40,6 +40,7 @@ class ZmqServer ZmqMessageHandler* handler); private: + void handleReceivedData(const char* buffer, const size_t size); void mqPollThread(); @@ -56,6 +57,12 @@ class ZmqServer std::string m_vrf; + void* m_context; + + void* m_socket; + + bool m_allowZmqPoll; + std::map> m_HandlerMap; };