diff --git a/syncd/Makefile.am b/syncd/Makefile.am index d34131156..4982ea16c 100644 --- a/syncd/Makefile.am +++ b/syncd/Makefile.am @@ -34,6 +34,7 @@ libSyncd_a_SOURCES = \ NotificationQueue.cpp \ PortMap.cpp \ PortMapParser.cpp \ + PortStateChangeHandler.cpp \ RedisClient.cpp \ RedisNotificationProducer.cpp \ RequestShutdownCommandLineOptions.cpp \ diff --git a/syncd/PortStateChangeHandler.cpp b/syncd/PortStateChangeHandler.cpp new file mode 100644 index 000000000..16f7b18b6 --- /dev/null +++ b/syncd/PortStateChangeHandler.cpp @@ -0,0 +1,46 @@ +#include "PortStateChangeHandler.h" + +#include + +using namespace syncd; + +PortStateChangeHandler::PortStateChangeHandler( + _In_ std::shared_ptr portStateChangeEvent) + : m_portStateChangeEvent(portStateChangeEvent) +{ + SWSS_LOG_ENTER(); + + m_portStateChangeQueue = std::make_shared( + m_portStateChangeQueueSize); +} + +PortStateChangeHandler::~PortStateChangeHandler() +{ + SWSS_LOG_ENTER(); +} + +void PortStateChangeHandler::handlePortStateChangeNotification( + _In_ uint32_t count, + _In_ const sai_port_oper_status_notification_t *data) +{ + SWSS_LOG_ENTER(); + + if (m_portStateChangeEvent == nullptr) + { + SWSS_LOG_THROW("Unexpected error: port state change event is null."); + } + + for (uint32_t idx = 0; idx < count; ++idx) + { + if (m_portStateChangeQueue->enqueue(data[idx]) == false) + { + SWSS_LOG_ERROR( + "Unexpected error: failed to enqueue the port state change " + "notification."); + + return; + } + } + + m_portStateChangeEvent->notify(); +} diff --git a/syncd/PortStateChangeHandler.h b/syncd/PortStateChangeHandler.h new file mode 100644 index 000000000..931834f7b --- /dev/null +++ b/syncd/PortStateChangeHandler.h @@ -0,0 +1,59 @@ +#pragma once + +extern "C" { +#include "saimetadata.h" +} + +#include + +#include "ConcurrentQueue.h" +#include "swss/logger.h" +#include "swss/selectableevent.h" + +namespace syncd +{ + + // Class to handle the port state change callback from SAI. This consists a + // selectable event that will be used to send notification from producer thread + // to consumer thread, and a mutex protected concurrent queue to share the port + // state change notification data between producer and consumer threads. + class PortStateChangeHandler + { + public: + + using portOperStatusNotificationQueue = + ConcurrentQueue; + + PortStateChangeHandler( + _In_ std::shared_ptr portStateChangeEvent); + + virtual ~PortStateChangeHandler(); + + // Adds the port operational status notification data to a queue and generates a + // notification event. + void handlePortStateChangeNotification( + _In_ uint32_t count, + _In_ const sai_port_oper_status_notification_t *data); + + // Returns the shared pointer of the queue. + std::shared_ptr getQueue() const + { + SWSS_LOG_ENTER(); + + return m_portStateChangeQueue; + } + + private: + + // Choosing 4k max event queue size based on if we had 256 ports, it can + // accommodate on average 16 port events per ports in worst case. + static constexpr size_t m_portStateChangeQueueSize = 4096; + + // SelectableEvent for producer to generate the event and for consumer to + // listen on. + std::shared_ptr m_portStateChangeEvent; + + // Mutex protected queue to share the data between producer and consumer. + std::shared_ptr m_portStateChangeQueue; + }; +} // namespace syncd diff --git a/unittest/syncd/Makefile.am b/unittest/syncd/Makefile.am index 4e02f5172..27c301e25 100644 --- a/unittest/syncd/Makefile.am +++ b/unittest/syncd/Makefile.am @@ -16,6 +16,7 @@ tests_SOURCES = main.cpp \ TestNotificationProcessor.cpp \ TestNotificationHandler.cpp \ TestMdioIpcServer.cpp \ + TestPortStateChangeHandler.cpp \ TestVendorSai.cpp tests_CXXFLAGS = $(DBGFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS_COMMON) diff --git a/unittest/syncd/TestPortStateChangeHandler.cpp b/unittest/syncd/TestPortStateChangeHandler.cpp new file mode 100644 index 000000000..2e32a3720 --- /dev/null +++ b/unittest/syncd/TestPortStateChangeHandler.cpp @@ -0,0 +1,60 @@ +#include "PortStateChangeHandler.h" + +#include + +using namespace syncd; + +constexpr size_t portStateChangeQueueSize = 4096; + +class PortStateChangeHandlerTest : public ::testing::Test +{ + protected: + PortStateChangeHandlerTest() + : m_portStateChangeHandler(std::make_shared()) + { + SWSS_LOG_ENTER(); + } + + ~PortStateChangeHandlerTest() override + { + SWSS_LOG_ENTER(); + } + + PortStateChangeHandler m_portStateChangeHandler; +}; + +TEST_F(PortStateChangeHandlerTest, VerifyGetQueue) +{ + auto queue = m_portStateChangeHandler.getQueue(); + EXPECT_EQ(queue->size(), 0); +} + +TEST_F(PortStateChangeHandlerTest, + HandlePortStateChangeNotificationFailsOnEnqueuingData) +{ + auto queue = m_portStateChangeHandler.getQueue(); + EXPECT_EQ(queue->size(), 0); + + // Insert enough data in the queue so it reaches its capacity. + sai_port_oper_status_notification_t operStatus[portStateChangeQueueSize]; + m_portStateChangeHandler.handlePortStateChangeNotification( + portStateChangeQueueSize, &operStatus[0]); + EXPECT_EQ(queue->size(), portStateChangeQueueSize); + + // Since queue is at its maximum capacity, adding a new element should cause + // insert failure and new element should not get added. + m_portStateChangeHandler.handlePortStateChangeNotification(/*count=*/1, + &operStatus[0]); + EXPECT_EQ(queue->size(), portStateChangeQueueSize); +} + +TEST_F(PortStateChangeHandlerTest, HandlePortStateChangeNotificationSucceeds) +{ + auto queue = m_portStateChangeHandler.getQueue(); + EXPECT_EQ(queue->size(), 0); + + sai_port_oper_status_notification_t operStatus; + m_portStateChangeHandler.handlePortStateChangeNotification(/*count=*/1, + &operStatus); + EXPECT_EQ(queue->size(), 1); +}