Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Link Event Damping] Port state change handler class. #2

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions syncd/ConcurrentQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#pragma once

#include <mutex>
#include <queue>

#include "swss/logger.h"
#include "swss/sal.h"

namespace syncd
{
template <class T>
class ConcurrentQueue
{
public:

explicit ConcurrentQueue(
_In_ size_t queueSizeLimit = UNLIMITED);

virtual ~ConcurrentQueue() = default;

bool enqueue(
_In_ const T& val);

bool dequeue(
_Out_ T* valOut);

size_t size();

bool empty();

private:

// Queue size = 0 means there is no limit on queue size.
static constexpr size_t UNLIMITED = 0;

std::mutex m_mutex;
std::queue<T> m_queue;
size_t m_queueSizeLimit;

ConcurrentQueue<T>(const ConcurrentQueue<T>&) = delete;
ConcurrentQueue<T>& operator=(const ConcurrentQueue<T>&) = delete;
};

template <class T>
ConcurrentQueue<T>::ConcurrentQueue(
_In_ size_t queueSizeLimit)
: m_queueSizeLimit(queueSizeLimit)
{
SWSS_LOG_ENTER();
}

template <class T>
bool ConcurrentQueue<T>::enqueue(
_In_ const T& val)
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> mutex_lock(m_mutex);

// If the queue exceeds the limit, return false.
if ((m_queueSizeLimit == UNLIMITED) || (m_queue.size() < m_queueSizeLimit))
{
m_queue.push(val);
return true;
}

return false;
}

template <class T>
bool ConcurrentQueue<T>::dequeue(
_Out_ T* valOut)
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> mutex_lock(m_mutex);
if (m_queue.empty())
{
return false;
}

*valOut = m_queue.front();
m_queue.pop();

return true;
}

template <class T>
size_t ConcurrentQueue<T>::size()
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> mutex_lock(m_mutex);

return m_queue.size();
}

template <class T>
bool ConcurrentQueue<T>::empty()
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> mutex_lock(m_mutex);

return m_queue.empty();
}
} // namespace syncd
1 change: 1 addition & 0 deletions syncd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ libSyncd_a_SOURCES = \
NotificationQueue.cpp \
PortMap.cpp \
PortMapParser.cpp \
PortStateChangeHandler.cpp \
RedisClient.cpp \
RedisNotificationProducer.cpp \
RequestShutdownCommandLineOptions.cpp \
Expand Down
46 changes: 46 additions & 0 deletions syncd/PortStateChangeHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include "PortStateChangeHandler.h"

#include <string>

using namespace syncd;

PortStateChangeHandler::PortStateChangeHandler(
_In_ std::shared_ptr<swss::SelectableEvent> portStateChangeEvent)
: m_portStateChangeEvent(portStateChangeEvent)
{
SWSS_LOG_ENTER();

m_portStateChangeQueue = std::make_shared<portOperStatusNotificationQueue>(
m_portStateChangeQueueSize);
}

PortStateChangeHandler::~PortStateChangeHandler()
{
SWSS_LOG_ENTER();
}

void PortStateChangeHandler::handlePortStateChangeNotification(
_In_ uint32_t count,
_In_ const sai_port_oper_status_notification_t *data)
{
SWSS_LOG_ENTER();

if (m_portStateChangeEvent == nullptr)
{
SWSS_LOG_THROW("Unexpected error: port state change event is null.");
}

for (uint32_t idx = 0; idx < count; ++idx)
{
if (m_portStateChangeQueue->enqueue(data[idx]) == false)
{
SWSS_LOG_ERROR(
"Unexpected error: failed to enqueue the port state change "
"notification.");

return;
}
}

m_portStateChangeEvent->notify();
}
59 changes: 59 additions & 0 deletions syncd/PortStateChangeHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once

extern "C" {
#include "saimetadata.h"
}

#include <memory>

#include "ConcurrentQueue.h"
#include "swss/logger.h"
#include "swss/selectableevent.h"

namespace syncd
{

// Class to handle the port state change callback from SAI. This consists a
// selectable event that will be used to send notification from producer thread
// to consumer thread, and a mutex protected concurrent queue to share the port
// state change notification data between producer and consumer threads.
class PortStateChangeHandler
{
public:

using portOperStatusNotificationQueue =
ConcurrentQueue<sai_port_oper_status_notification_t>;

PortStateChangeHandler(
_In_ std::shared_ptr<swss::SelectableEvent> portStateChangeEvent);

virtual ~PortStateChangeHandler();

// Adds the port operational status notification data to a queue and generates a
// notification event.
void handlePortStateChangeNotification(
_In_ uint32_t count,
_In_ const sai_port_oper_status_notification_t *data);

// Returns the shared pointer of the queue.
std::shared_ptr<portOperStatusNotificationQueue> getQueue() const
{
SWSS_LOG_ENTER();

return m_portStateChangeQueue;
}

private:

// Choosing 4k max event queue size based on if we had 256 ports, it can
// accommodate on average 16 port events per ports in worst case.
static constexpr size_t m_portStateChangeQueueSize = 4096;

// SelectableEvent for producer to generate the event and for consumer to
// listen on.
std::shared_ptr<swss::SelectableEvent> m_portStateChangeEvent;

// Mutex protected queue to share the data between producer and consumer.
std::shared_ptr<portOperStatusNotificationQueue> m_portStateChangeQueue;
};
} // namespace syncd
1 change: 1 addition & 0 deletions tests/aspell.en.pws
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ IPGs
IPv
Inseg
KEYs
LLC
LOGLEVEL
LOOPBACK
MACsec
Expand Down
2 changes: 2 additions & 0 deletions unittest/syncd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ tests_SOURCES = main.cpp \
MockableSaiInterface.cpp \
MockHelper.cpp \
TestCommandLineOptions.cpp \
TestConcurrentQueue.cpp \
TestFlexCounter.cpp \
TestVirtualOidTranslator.cpp \
TestNotificationQueue.cpp \
TestNotificationProcessor.cpp \
TestNotificationHandler.cpp \
TestMdioIpcServer.cpp \
TestPortStateChangeHandler.cpp \
TestVendorSai.cpp

tests_CXXFLAGS = $(DBGFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS_COMMON)
Expand Down
77 changes: 77 additions & 0 deletions unittest/syncd/TestConcurrentQueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include <gtest/gtest.h>

#include "ConcurrentQueue.h"

using namespace syncd;

class ConcurrentQueueTest : public ::testing::Test
{};

TEST_F(ConcurrentQueueTest, QueueIsEmpty)
{
constexpr size_t queueSize = 5;
ConcurrentQueue<int> testQueue(queueSize);

EXPECT_TRUE(testQueue.empty());
EXPECT_EQ(testQueue.size(), 0);
}

TEST_F(ConcurrentQueueTest, EnqueueSucceeds)
{
constexpr size_t queueSize = 5;
ConcurrentQueue<int> testQueue(queueSize);

EXPECT_TRUE(testQueue.empty());

EXPECT_TRUE(testQueue.enqueue(1));
EXPECT_FALSE(testQueue.empty());
EXPECT_EQ(testQueue.size(), 1);
}

TEST_F(ConcurrentQueueTest, EnqueueFailsIfQueueSizeLimitIsReached)
{
constexpr size_t queueSize = 5;
ConcurrentQueue<int> testQueue(queueSize);

EXPECT_TRUE(testQueue.empty());

int data = 1;
for (size_t idx = 0; idx < queueSize; ++idx)
{
SCOPED_TRACE(::testing::Message() << "Inserting data at index: " << idx);
EXPECT_TRUE(testQueue.enqueue(data++));
EXPECT_EQ(testQueue.size(), idx + 1);
}

EXPECT_EQ(testQueue.size(), queueSize);

// Once queue is at maximum capacity, en-queuing next element will fail.
EXPECT_FALSE(testQueue.enqueue(data));
EXPECT_EQ(testQueue.size(), queueSize);
}

TEST_F(ConcurrentQueueTest, DequeueFailsIfQueueIsEmpty)
{
constexpr size_t queueSize = 5;
ConcurrentQueue<int> testQueue(queueSize);

EXPECT_TRUE(testQueue.empty());

int val;
EXPECT_FALSE(testQueue.dequeue(&val));
}

TEST_F(ConcurrentQueueTest, DequeueSucceeds)
{
ConcurrentQueue<int> testQueue;
EXPECT_TRUE(testQueue.empty());

constexpr int testValue = 56;
EXPECT_TRUE(testQueue.enqueue(testValue));
EXPECT_EQ(testQueue.size(), 1);

int val;
EXPECT_TRUE(testQueue.dequeue(&val));
EXPECT_EQ(val, testValue);
EXPECT_TRUE(testQueue.empty());
}
60 changes: 60 additions & 0 deletions unittest/syncd/TestPortStateChangeHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#include "PortStateChangeHandler.h"

#include <gtest/gtest.h>

using namespace syncd;

constexpr size_t portStateChangeQueueSize = 4096;

class PortStateChangeHandlerTest : public ::testing::Test
{
protected:
PortStateChangeHandlerTest()
: m_portStateChangeHandler(std::make_shared<swss::SelectableEvent>())
{
SWSS_LOG_ENTER();
}

~PortStateChangeHandlerTest() override
{
SWSS_LOG_ENTER();
}

PortStateChangeHandler m_portStateChangeHandler;
};

TEST_F(PortStateChangeHandlerTest, VerifyGetQueue)
{
auto queue = m_portStateChangeHandler.getQueue();
EXPECT_EQ(queue->size(), 0);
}

TEST_F(PortStateChangeHandlerTest,
HandlePortStateChangeNotificationFailsOnEnqueuingData)
{
auto queue = m_portStateChangeHandler.getQueue();
EXPECT_EQ(queue->size(), 0);

// Insert enough data in the queue so it reaches its capacity.
sai_port_oper_status_notification_t operStatus[portStateChangeQueueSize];
m_portStateChangeHandler.handlePortStateChangeNotification(
portStateChangeQueueSize, &operStatus[0]);
EXPECT_EQ(queue->size(), portStateChangeQueueSize);

// Since queue is at its maximum capacity, adding a new element should cause
// insert failure and new element should not get added.
m_portStateChangeHandler.handlePortStateChangeNotification(/*count=*/1,
&operStatus[0]);
EXPECT_EQ(queue->size(), portStateChangeQueueSize);
}

TEST_F(PortStateChangeHandlerTest, HandlePortStateChangeNotificationSucceeds)
{
auto queue = m_portStateChangeHandler.getQueue();
EXPECT_EQ(queue->size(), 0);

sai_port_oper_status_notification_t operStatus;
m_portStateChangeHandler.handlePortStateChangeNotification(/*count=*/1,
&operStatus);
EXPECT_EQ(queue->size(), 1);
}