diff --git a/syncd/ConcurrentQueue.h b/syncd/ConcurrentQueue.h new file mode 100644 index 000000000..b5cc93818 --- /dev/null +++ b/syncd/ConcurrentQueue.h @@ -0,0 +1,121 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "swss/logger.h" +#include "swss/sal.h" + +namespace syncd +{ + template + class ConcurrentQueue + { + public: + + explicit ConcurrentQueue( + _In_ size_t queueSizeLimit = UNLIMITED); + + virtual ~ConcurrentQueue() = default; + + bool enqueue( + _In_ const T& val); + + bool dequeue( + _Out_ T* valOut); + + size_t size(); + + bool empty(); + + private: + + // Queue size = 0 means there is no limit on queue size. + static constexpr size_t UNLIMITED = 0; + + std::mutex m_mutex; + std::queue m_queue; + size_t m_queueSizeLimit; + + ConcurrentQueue(const ConcurrentQueue&) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; + }; + + template + ConcurrentQueue::ConcurrentQueue( + _In_ size_t queueSizeLimit) + : m_queueSizeLimit(queueSizeLimit) + { + SWSS_LOG_ENTER(); + } + + template + bool ConcurrentQueue::enqueue( + _In_ const T& val) + { + SWSS_LOG_ENTER(); + + std::lock_guard mutex_lock(m_mutex); + + // If the queue exceeds the limit, return false. + if ((m_queueSizeLimit == UNLIMITED) || (m_queue.size() < m_queueSizeLimit)) + { + m_queue.push(val); + return true; + } + + return false; + } + + template + bool ConcurrentQueue::dequeue( + _Out_ T* valOut) + { + SWSS_LOG_ENTER(); + + std::lock_guard mutex_lock(m_mutex); + if (m_queue.empty()) + { + return false; + } + + *valOut = m_queue.front(); + m_queue.pop(); + + return true; + } + + template + size_t ConcurrentQueue::size() + { + SWSS_LOG_ENTER(); + + std::lock_guard mutex_lock(m_mutex); + + return m_queue.size(); + } + + template + bool ConcurrentQueue::empty() + { + SWSS_LOG_ENTER(); + + std::lock_guard mutex_lock(m_mutex); + + return m_queue.empty(); + } +} // namespace syncd diff --git a/unittest/syncd/Makefile.am b/unittest/syncd/Makefile.am index 577614a19..4e02f5172 100644 --- a/unittest/syncd/Makefile.am +++ b/unittest/syncd/Makefile.am @@ -9,6 +9,7 @@ tests_SOURCES = main.cpp \ MockableSaiInterface.cpp \ MockHelper.cpp \ TestCommandLineOptions.cpp \ + TestConcurrentQueue.cpp \ TestFlexCounter.cpp \ TestVirtualOidTranslator.cpp \ TestNotificationQueue.cpp \ diff --git a/unittest/syncd/TestConcurrentQueue.cpp b/unittest/syncd/TestConcurrentQueue.cpp new file mode 100644 index 000000000..e7eb84863 --- /dev/null +++ b/unittest/syncd/TestConcurrentQueue.cpp @@ -0,0 +1,91 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "ConcurrentQueue.h" + +using namespace syncd; + +class ConcurrentQueueTest : public ::testing::Test +{}; + +TEST_F(ConcurrentQueueTest, QueueIsEmpty) +{ + constexpr size_t queueSize = 5; + ConcurrentQueue testQueue(queueSize); + + EXPECT_TRUE(testQueue.empty()); + EXPECT_EQ(testQueue.size(), 0); +} + +TEST_F(ConcurrentQueueTest, EnqueueSucceeds) +{ + constexpr size_t queueSize = 5; + ConcurrentQueue testQueue(queueSize); + + EXPECT_TRUE(testQueue.empty()); + + EXPECT_TRUE(testQueue.enqueue(1)); + EXPECT_FALSE(testQueue.empty()); + EXPECT_EQ(testQueue.size(), 1); +} + +TEST_F(ConcurrentQueueTest, EnqueueFailsIfQueueSizeLimitIsReached) +{ + constexpr size_t queueSize = 5; + ConcurrentQueue testQueue(queueSize); + + EXPECT_TRUE(testQueue.empty()); + + int data = 1; + for (size_t idx = 0; idx < queueSize; ++idx) + { + SCOPED_TRACE(::testing::Message() << "Inserting data at index: " << idx); + EXPECT_TRUE(testQueue.enqueue(data++)); + EXPECT_EQ(testQueue.size(), idx + 1); + } + + EXPECT_EQ(testQueue.size(), queueSize); + + // Once queue is at maximum capacity, en-queuing next element will fail. + EXPECT_FALSE(testQueue.enqueue(data)); + EXPECT_EQ(testQueue.size(), queueSize); +} + +TEST_F(ConcurrentQueueTest, DequeueFailsIfQueueIsEmpty) +{ + constexpr size_t queueSize = 5; + ConcurrentQueue testQueue(queueSize); + + EXPECT_TRUE(testQueue.empty()); + + int val; + EXPECT_FALSE(testQueue.dequeue(&val)); +} + +TEST_F(ConcurrentQueueTest, DequeueSucceeds) +{ + ConcurrentQueue testQueue; + EXPECT_TRUE(testQueue.empty()); + + constexpr int testValue = 56; + EXPECT_TRUE(testQueue.enqueue(testValue)); + EXPECT_EQ(testQueue.size(), 1); + + int val; + EXPECT_TRUE(testQueue.dequeue(&val)); + EXPECT_EQ(val, testValue); + EXPECT_TRUE(testQueue.empty()); +}