Skip to content

Commit

Permalink
[Link event damping] Add generic concurrent queue for link event damp…
Browse files Browse the repository at this point in the history
…ing.

- This queue will be used to enqueue the port state change events by
  NotificationHandler and dequeued by syncd main thread and processed in
  link event damping logic.

HLD: sonic-net/SONiC#1071
  • Loading branch information
Ashish Singh committed Sep 29, 2023
1 parent c22b76b commit 2c9b338
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 0 deletions.
120 changes: 120 additions & 0 deletions syncd/ConcurrentQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <mutex>
#include <queue>

#include "swss/logger.h"

namespace syncd
{

template <class T>
class ConcurrentQueue
{
public:
explicit ConcurrentQueue(_In_ size_t queueSizeLimit = m_unlimited);

virtual ~ConcurrentQueue();

bool enqueue(_In_ const T& val);

bool dequeue(_Out_ T* val_out);

size_t getQueueSize();

bool empty();

private:
// If queue size is specified 0, that will mean there is no limit on queue
// size.
static constexpr size_t m_unlimited = 0;

std::mutex m_mutex;
std::queue<T> m_queue;
size_t m_queueSizeLimit;

ConcurrentQueue<T>(const ConcurrentQueue<T>&) = delete;
ConcurrentQueue<T>& operator=(const ConcurrentQueue<T>&) = delete;
};

template <class T>
ConcurrentQueue<T>::ConcurrentQueue(_In_ size_t queueSizeLimit)
: m_queueSizeLimit(queueSizeLimit)
{
SWSS_LOG_ENTER();
}

template <class T>
ConcurrentQueue<T>::~ConcurrentQueue()
{
SWSS_LOG_ENTER();
}

template <class T>
bool ConcurrentQueue<T>::enqueue(_In_ const T& val)
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> mutex_lock(m_mutex);

// If the queue exceeds the limit, return false.
if ((m_queueSizeLimit == m_unlimited) || (m_queue.size() < m_queueSizeLimit))
{
m_queue.push(val);
return true;
}

return false;
}

template <class T>
bool ConcurrentQueue<T>::dequeue(_Out_ T* val_out)
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> mutex_lock(m_mutex);
if (m_queue.empty())
{
return false;
}

*val_out = m_queue.front();
m_queue.pop();

return true;
}

template <class T>
size_t ConcurrentQueue<T>::getQueueSize()
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> mutex_lock(m_mutex);

return m_queue.size();
}

template <class T>
bool ConcurrentQueue<T>::empty()
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> mutex_lock(m_mutex);

return m_queue.empty();
}
} // namespace syncd
1 change: 1 addition & 0 deletions unittest/syncd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ tests_SOURCES = main.cpp \
MockableSaiInterface.cpp \
MockHelper.cpp \
TestCommandLineOptions.cpp \
TestConcurrentQueue.cpp \
TestFlexCounter.cpp \
TestVirtualOidTranslator.cpp \
TestNotificationQueue.cpp \
Expand Down
89 changes: 89 additions & 0 deletions unittest/syncd/TestConcurrentQueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gtest/gtest.h>

#include "ConcurrentQueue.h"

using namespace syncd;

class ConcurrentQueueTest : public ::testing::Test
{};

TEST_F(ConcurrentQueueTest, QueueIsEmpty) {
constexpr size_t queueSize = 5;
ConcurrentQueue<int> testQueue(queueSize);

EXPECT_TRUE(testQueue.empty());
EXPECT_EQ(testQueue.getQueueSize(), 0);
}

TEST_F(ConcurrentQueueTest, EnqueueSucceeds) {
constexpr size_t queueSize = 5;
ConcurrentQueue<int> testQueue(queueSize);

EXPECT_TRUE(testQueue.empty());

// Enqueue one element.
EXPECT_TRUE(testQueue.enqueue(1));
EXPECT_FALSE(testQueue.empty());
EXPECT_EQ(testQueue.getQueueSize(), 1);
}

// Once queue is at maximum capacity, enqueuing next element will fail.
TEST_F(ConcurrentQueueTest, EnqueueFailsIfQueueSizeLimitIsReached) {
constexpr size_t queueSize = 5;
ConcurrentQueue<int> testQueue(queueSize);

EXPECT_TRUE(testQueue.Empty());

int data = 1;
for (size_t idx = 0; idx < queueSize; ++idx)
{
SCOPED_TRACE(::testing::Message() << "Inserting data at index: " << idx);
EXPECT_TRUE(testQueue.enqueue(data++));
EXPECT_EQ(testQueue.getQueueSize(), idx + 1);
}

EXPECT_EQ(testQueue.getQueueSize(), queueSize);

// Once queue is at maximum capacity, enqueuing next element will fail.
EXPECT_FALSE(testQueue.enqueue(data));
EXPECT_EQ(testQueue.getQueueSize(), queueSize);
}

TEST_F(ConcurrentQueueTest, DequeueFailsIfQueueisEmpty) {
constexpr size_t queueSize = 5;
ConcurrentQueue<int> testQueue(queueSize);

EXPECT_TRUE(testQueue.empty());

int val;
EXPECT_FALSE(testQueue.dequeue(&val));
}

TEST_F(ConcurrentQueueTest, DequeueSucceeds) {
ConcurrentQueue<int> testQueue;
EXPECT_TRUE(testQueue.empty());

constexpr int testValue = 56;
EXPECT_TRUE(testQueue.enqueue(testValue));
EXPECT_EQ(testQueue.getQueueSize(), 1);

// Dequeue the value.
int val;
EXPECT_TRUE(testQueue.dequeue(&val));
EXPECT_EQ(val, testValue);
EXPECT_TRUE(testQueue.empty());
}

0 comments on commit 2c9b338

Please sign in to comment.