From 2c9b3386c3025de3eae05a681f0f62f8f4e32938 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 29 Sep 2023 13:03:45 -0700 Subject: [PATCH] [Link event damping] Add generic concurrent queue for link event damping. - This queue will be used to enqueue the port state change events by NotificationHandler and dequeued by syncd main thread and processed in link event damping logic. HLD: sonic-net/SONiC#1071 --- syncd/ConcurrentQueue.h | 120 +++++++++++++++++++++++++ unittest/syncd/Makefile.am | 1 + unittest/syncd/TestConcurrentQueue.cpp | 89 ++++++++++++++++++ 3 files changed, 210 insertions(+) create mode 100644 syncd/ConcurrentQueue.h create mode 100644 unittest/syncd/TestConcurrentQueue.cpp diff --git a/syncd/ConcurrentQueue.h b/syncd/ConcurrentQueue.h new file mode 100644 index 000000000..087356e31 --- /dev/null +++ b/syncd/ConcurrentQueue.h @@ -0,0 +1,120 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "swss/logger.h" + +namespace syncd +{ + + template + class ConcurrentQueue + { + public: + explicit ConcurrentQueue(_In_ size_t queueSizeLimit = m_unlimited); + + virtual ~ConcurrentQueue(); + + bool enqueue(_In_ const T& val); + + bool dequeue(_Out_ T* val_out); + + size_t getQueueSize(); + + bool empty(); + + private: + // If queue size is specified 0, that will mean there is no limit on queue + // size. + static constexpr size_t m_unlimited = 0; + + std::mutex m_mutex; + std::queue m_queue; + size_t m_queueSizeLimit; + + ConcurrentQueue(const ConcurrentQueue&) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; + }; + + template + ConcurrentQueue::ConcurrentQueue(_In_ size_t queueSizeLimit) + : m_queueSizeLimit(queueSizeLimit) + { + SWSS_LOG_ENTER(); + } + + template + ConcurrentQueue::~ConcurrentQueue() + { + SWSS_LOG_ENTER(); + } + + template + bool ConcurrentQueue::enqueue(_In_ const T& val) + { + SWSS_LOG_ENTER(); + + std::lock_guard mutex_lock(m_mutex); + + // If the queue exceeds the limit, return false. + if ((m_queueSizeLimit == m_unlimited) || (m_queue.size() < m_queueSizeLimit)) + { + m_queue.push(val); + return true; + } + + return false; + } + + template + bool ConcurrentQueue::dequeue(_Out_ T* val_out) + { + SWSS_LOG_ENTER(); + + std::lock_guard mutex_lock(m_mutex); + if (m_queue.empty()) + { + return false; + } + + *val_out = m_queue.front(); + m_queue.pop(); + + return true; + } + + template + size_t ConcurrentQueue::getQueueSize() + { + SWSS_LOG_ENTER(); + + std::lock_guard mutex_lock(m_mutex); + + return m_queue.size(); + } + + template + bool ConcurrentQueue::empty() + { + SWSS_LOG_ENTER(); + + std::lock_guard mutex_lock(m_mutex); + + return m_queue.empty(); + } +} // namespace syncd diff --git a/unittest/syncd/Makefile.am b/unittest/syncd/Makefile.am index 577614a19..4e02f5172 100644 --- a/unittest/syncd/Makefile.am +++ b/unittest/syncd/Makefile.am @@ -9,6 +9,7 @@ tests_SOURCES = main.cpp \ MockableSaiInterface.cpp \ MockHelper.cpp \ TestCommandLineOptions.cpp \ + TestConcurrentQueue.cpp \ TestFlexCounter.cpp \ TestVirtualOidTranslator.cpp \ TestNotificationQueue.cpp \ diff --git a/unittest/syncd/TestConcurrentQueue.cpp b/unittest/syncd/TestConcurrentQueue.cpp new file mode 100644 index 000000000..af03c7d51 --- /dev/null +++ b/unittest/syncd/TestConcurrentQueue.cpp @@ -0,0 +1,89 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "ConcurrentQueue.h" + +using namespace syncd; + +class ConcurrentQueueTest : public ::testing::Test +{}; + +TEST_F(ConcurrentQueueTest, QueueIsEmpty) { + constexpr size_t queueSize = 5; + ConcurrentQueue testQueue(queueSize); + + EXPECT_TRUE(testQueue.empty()); + EXPECT_EQ(testQueue.getQueueSize(), 0); +} + +TEST_F(ConcurrentQueueTest, EnqueueSucceeds) { + constexpr size_t queueSize = 5; + ConcurrentQueue testQueue(queueSize); + + EXPECT_TRUE(testQueue.empty()); + + // Enqueue one element. + EXPECT_TRUE(testQueue.enqueue(1)); + EXPECT_FALSE(testQueue.empty()); + EXPECT_EQ(testQueue.getQueueSize(), 1); +} + +// Once queue is at maximum capacity, enqueuing next element will fail. +TEST_F(ConcurrentQueueTest, EnqueueFailsIfQueueSizeLimitIsReached) { + constexpr size_t queueSize = 5; + ConcurrentQueue testQueue(queueSize); + + EXPECT_TRUE(testQueue.Empty()); + + int data = 1; + for (size_t idx = 0; idx < queueSize; ++idx) + { + SCOPED_TRACE(::testing::Message() << "Inserting data at index: " << idx); + EXPECT_TRUE(testQueue.enqueue(data++)); + EXPECT_EQ(testQueue.getQueueSize(), idx + 1); + } + + EXPECT_EQ(testQueue.getQueueSize(), queueSize); + + // Once queue is at maximum capacity, enqueuing next element will fail. + EXPECT_FALSE(testQueue.enqueue(data)); + EXPECT_EQ(testQueue.getQueueSize(), queueSize); +} + +TEST_F(ConcurrentQueueTest, DequeueFailsIfQueueisEmpty) { + constexpr size_t queueSize = 5; + ConcurrentQueue testQueue(queueSize); + + EXPECT_TRUE(testQueue.empty()); + + int val; + EXPECT_FALSE(testQueue.dequeue(&val)); +} + +TEST_F(ConcurrentQueueTest, DequeueSucceeds) { + ConcurrentQueue testQueue; + EXPECT_TRUE(testQueue.empty()); + + constexpr int testValue = 56; + EXPECT_TRUE(testQueue.enqueue(testValue)); + EXPECT_EQ(testQueue.getQueueSize(), 1); + + // Dequeue the value. + int val; + EXPECT_TRUE(testQueue.dequeue(&val)); + EXPECT_EQ(val, testValue); + EXPECT_TRUE(testQueue.empty()); +}