diff --git a/syncd/ConcurrentQueue.h b/syncd/ConcurrentQueue.h new file mode 100644 index 000000000..f7f011154 --- /dev/null +++ b/syncd/ConcurrentQueue.h @@ -0,0 +1,107 @@ +#pragma once + +#include +#include + +#include "swss/logger.h" +#include "swss/sal.h" + +namespace syncd +{ + template + class ConcurrentQueue + { + public: + + explicit ConcurrentQueue( + _In_ size_t queueSizeLimit = UNLIMITED); + + virtual ~ConcurrentQueue() = default; + + bool enqueue( + _In_ const T& val); + + bool dequeue( + _Out_ T* valOut); + + size_t size(); + + bool empty(); + + private: + + // Queue size = 0 means there is no limit on queue size. + static constexpr size_t UNLIMITED = 0; + + std::mutex m_mutex; + std::queue m_queue; + size_t m_queueSizeLimit; + + ConcurrentQueue(const ConcurrentQueue&) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; + }; + + template + ConcurrentQueue::ConcurrentQueue( + _In_ size_t queueSizeLimit) + : m_queueSizeLimit(queueSizeLimit) + { + SWSS_LOG_ENTER(); + } + + template + bool ConcurrentQueue::enqueue( + _In_ const T& val) + { + SWSS_LOG_ENTER(); + + std::lock_guard mutex_lock(m_mutex); + + // If the queue exceeds the limit, return false. + if ((m_queueSizeLimit == UNLIMITED) || (m_queue.size() < m_queueSizeLimit)) + { + m_queue.push(val); + return true; + } + + return false; + } + + template + bool ConcurrentQueue::dequeue( + _Out_ T* valOut) + { + SWSS_LOG_ENTER(); + + std::lock_guard mutex_lock(m_mutex); + if (m_queue.empty()) + { + return false; + } + + *valOut = m_queue.front(); + m_queue.pop(); + + return true; + } + + template + size_t ConcurrentQueue::size() + { + SWSS_LOG_ENTER(); + + std::lock_guard mutex_lock(m_mutex); + + return m_queue.size(); + } + + template + bool ConcurrentQueue::empty() + { + SWSS_LOG_ENTER(); + + std::lock_guard mutex_lock(m_mutex); + + return m_queue.empty(); + } +} // namespace syncd diff --git a/syncd/Makefile.am b/syncd/Makefile.am index d34131156..4982ea16c 100644 --- a/syncd/Makefile.am +++ b/syncd/Makefile.am @@ -34,6 +34,7 @@ libSyncd_a_SOURCES = \ NotificationQueue.cpp \ PortMap.cpp \ PortMapParser.cpp \ + PortStateChangeHandler.cpp \ RedisClient.cpp \ RedisNotificationProducer.cpp \ RequestShutdownCommandLineOptions.cpp \ diff --git a/syncd/PortStateChangeHandler.cpp b/syncd/PortStateChangeHandler.cpp new file mode 100644 index 000000000..16f7b18b6 --- /dev/null +++ b/syncd/PortStateChangeHandler.cpp @@ -0,0 +1,46 @@ +#include "PortStateChangeHandler.h" + +#include + +using namespace syncd; + +PortStateChangeHandler::PortStateChangeHandler( + _In_ std::shared_ptr portStateChangeEvent) + : m_portStateChangeEvent(portStateChangeEvent) +{ + SWSS_LOG_ENTER(); + + m_portStateChangeQueue = std::make_shared( + m_portStateChangeQueueSize); +} + +PortStateChangeHandler::~PortStateChangeHandler() +{ + SWSS_LOG_ENTER(); +} + +void PortStateChangeHandler::handlePortStateChangeNotification( + _In_ uint32_t count, + _In_ const sai_port_oper_status_notification_t *data) +{ + SWSS_LOG_ENTER(); + + if (m_portStateChangeEvent == nullptr) + { + SWSS_LOG_THROW("Unexpected error: port state change event is null."); + } + + for (uint32_t idx = 0; idx < count; ++idx) + { + if (m_portStateChangeQueue->enqueue(data[idx]) == false) + { + SWSS_LOG_ERROR( + "Unexpected error: failed to enqueue the port state change " + "notification."); + + return; + } + } + + m_portStateChangeEvent->notify(); +} diff --git a/syncd/PortStateChangeHandler.h b/syncd/PortStateChangeHandler.h new file mode 100644 index 000000000..931834f7b --- /dev/null +++ b/syncd/PortStateChangeHandler.h @@ -0,0 +1,59 @@ +#pragma once + +extern "C" { +#include "saimetadata.h" +} + +#include + +#include "ConcurrentQueue.h" +#include "swss/logger.h" +#include "swss/selectableevent.h" + +namespace syncd +{ + + // Class to handle the port state change callback from SAI. This consists a + // selectable event that will be used to send notification from producer thread + // to consumer thread, and a mutex protected concurrent queue to share the port + // state change notification data between producer and consumer threads. + class PortStateChangeHandler + { + public: + + using portOperStatusNotificationQueue = + ConcurrentQueue; + + PortStateChangeHandler( + _In_ std::shared_ptr portStateChangeEvent); + + virtual ~PortStateChangeHandler(); + + // Adds the port operational status notification data to a queue and generates a + // notification event. + void handlePortStateChangeNotification( + _In_ uint32_t count, + _In_ const sai_port_oper_status_notification_t *data); + + // Returns the shared pointer of the queue. + std::shared_ptr getQueue() const + { + SWSS_LOG_ENTER(); + + return m_portStateChangeQueue; + } + + private: + + // Choosing 4k max event queue size based on if we had 256 ports, it can + // accommodate on average 16 port events per ports in worst case. + static constexpr size_t m_portStateChangeQueueSize = 4096; + + // SelectableEvent for producer to generate the event and for consumer to + // listen on. + std::shared_ptr m_portStateChangeEvent; + + // Mutex protected queue to share the data between producer and consumer. + std::shared_ptr m_portStateChangeQueue; + }; +} // namespace syncd diff --git a/tests/aspell.en.pws b/tests/aspell.en.pws index ee79d3ae7..923b1ba8b 100644 --- a/tests/aspell.en.pws +++ b/tests/aspell.en.pws @@ -50,6 +50,7 @@ IPGs IPv Inseg KEYs +LLC LOGLEVEL LOOPBACK MACsec diff --git a/unittest/syncd/Makefile.am b/unittest/syncd/Makefile.am index 577614a19..27c301e25 100644 --- a/unittest/syncd/Makefile.am +++ b/unittest/syncd/Makefile.am @@ -9,12 +9,14 @@ tests_SOURCES = main.cpp \ MockableSaiInterface.cpp \ MockHelper.cpp \ TestCommandLineOptions.cpp \ + TestConcurrentQueue.cpp \ TestFlexCounter.cpp \ TestVirtualOidTranslator.cpp \ TestNotificationQueue.cpp \ TestNotificationProcessor.cpp \ TestNotificationHandler.cpp \ TestMdioIpcServer.cpp \ + TestPortStateChangeHandler.cpp \ TestVendorSai.cpp tests_CXXFLAGS = $(DBGFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS_COMMON) diff --git a/unittest/syncd/TestConcurrentQueue.cpp b/unittest/syncd/TestConcurrentQueue.cpp new file mode 100644 index 000000000..65b5e3764 --- /dev/null +++ b/unittest/syncd/TestConcurrentQueue.cpp @@ -0,0 +1,77 @@ +#include + +#include "ConcurrentQueue.h" + +using namespace syncd; + +class ConcurrentQueueTest : public ::testing::Test +{}; + +TEST_F(ConcurrentQueueTest, QueueIsEmpty) +{ + constexpr size_t queueSize = 5; + ConcurrentQueue testQueue(queueSize); + + EXPECT_TRUE(testQueue.empty()); + EXPECT_EQ(testQueue.size(), 0); +} + +TEST_F(ConcurrentQueueTest, EnqueueSucceeds) +{ + constexpr size_t queueSize = 5; + ConcurrentQueue testQueue(queueSize); + + EXPECT_TRUE(testQueue.empty()); + + EXPECT_TRUE(testQueue.enqueue(1)); + EXPECT_FALSE(testQueue.empty()); + EXPECT_EQ(testQueue.size(), 1); +} + +TEST_F(ConcurrentQueueTest, EnqueueFailsIfQueueSizeLimitIsReached) +{ + constexpr size_t queueSize = 5; + ConcurrentQueue testQueue(queueSize); + + EXPECT_TRUE(testQueue.empty()); + + int data = 1; + for (size_t idx = 0; idx < queueSize; ++idx) + { + SCOPED_TRACE(::testing::Message() << "Inserting data at index: " << idx); + EXPECT_TRUE(testQueue.enqueue(data++)); + EXPECT_EQ(testQueue.size(), idx + 1); + } + + EXPECT_EQ(testQueue.size(), queueSize); + + // Once queue is at maximum capacity, en-queuing next element will fail. + EXPECT_FALSE(testQueue.enqueue(data)); + EXPECT_EQ(testQueue.size(), queueSize); +} + +TEST_F(ConcurrentQueueTest, DequeueFailsIfQueueIsEmpty) +{ + constexpr size_t queueSize = 5; + ConcurrentQueue testQueue(queueSize); + + EXPECT_TRUE(testQueue.empty()); + + int val; + EXPECT_FALSE(testQueue.dequeue(&val)); +} + +TEST_F(ConcurrentQueueTest, DequeueSucceeds) +{ + ConcurrentQueue testQueue; + EXPECT_TRUE(testQueue.empty()); + + constexpr int testValue = 56; + EXPECT_TRUE(testQueue.enqueue(testValue)); + EXPECT_EQ(testQueue.size(), 1); + + int val; + EXPECT_TRUE(testQueue.dequeue(&val)); + EXPECT_EQ(val, testValue); + EXPECT_TRUE(testQueue.empty()); +} diff --git a/unittest/syncd/TestPortStateChangeHandler.cpp b/unittest/syncd/TestPortStateChangeHandler.cpp new file mode 100644 index 000000000..2e32a3720 --- /dev/null +++ b/unittest/syncd/TestPortStateChangeHandler.cpp @@ -0,0 +1,60 @@ +#include "PortStateChangeHandler.h" + +#include + +using namespace syncd; + +constexpr size_t portStateChangeQueueSize = 4096; + +class PortStateChangeHandlerTest : public ::testing::Test +{ + protected: + PortStateChangeHandlerTest() + : m_portStateChangeHandler(std::make_shared()) + { + SWSS_LOG_ENTER(); + } + + ~PortStateChangeHandlerTest() override + { + SWSS_LOG_ENTER(); + } + + PortStateChangeHandler m_portStateChangeHandler; +}; + +TEST_F(PortStateChangeHandlerTest, VerifyGetQueue) +{ + auto queue = m_portStateChangeHandler.getQueue(); + EXPECT_EQ(queue->size(), 0); +} + +TEST_F(PortStateChangeHandlerTest, + HandlePortStateChangeNotificationFailsOnEnqueuingData) +{ + auto queue = m_portStateChangeHandler.getQueue(); + EXPECT_EQ(queue->size(), 0); + + // Insert enough data in the queue so it reaches its capacity. + sai_port_oper_status_notification_t operStatus[portStateChangeQueueSize]; + m_portStateChangeHandler.handlePortStateChangeNotification( + portStateChangeQueueSize, &operStatus[0]); + EXPECT_EQ(queue->size(), portStateChangeQueueSize); + + // Since queue is at its maximum capacity, adding a new element should cause + // insert failure and new element should not get added. + m_portStateChangeHandler.handlePortStateChangeNotification(/*count=*/1, + &operStatus[0]); + EXPECT_EQ(queue->size(), portStateChangeQueueSize); +} + +TEST_F(PortStateChangeHandlerTest, HandlePortStateChangeNotificationSucceeds) +{ + auto queue = m_portStateChangeHandler.getQueue(); + EXPECT_EQ(queue->size(), 0); + + sai_port_oper_status_notification_t operStatus; + m_portStateChangeHandler.handlePortStateChangeNotification(/*count=*/1, + &operStatus); + EXPECT_EQ(queue->size(), 1); +}