diff --git a/CMakeLists.txt b/CMakeLists.txt index 9695a73e..ec6a939d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,6 +32,9 @@ if(BUILD_TESTING) ament_add_gmock(realtime_buffer_tests test/realtime_buffer_tests.cpp) target_link_libraries(realtime_buffer_tests ${PROJECT_NAME} ${GMOCK_MAIN_LIBRARIES}) + ament_add_gmock(realtime_barrier_tests test/realtime_barrier_tests.cpp) + target_link_libraries(realtime_barrier_tests ${PROJECT_NAME} ${GMOCK_MAIN_LIBRARIES}) + ament_add_gmock(realtime_clock_tests test/realtime_clock_tests.cpp) target_link_libraries(realtime_clock_tests ${PROJECT_NAME} ${GMOCK_MAIN_LIBRARIES}) diff --git a/include/realtime_tools/realtime_barrier.hpp b/include/realtime_tools/realtime_barrier.hpp new file mode 100644 index 00000000..84f4cda2 --- /dev/null +++ b/include/realtime_tools/realtime_barrier.hpp @@ -0,0 +1,468 @@ +/* + * Copyright (c) 2021 FlyingEinstein.com + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the Willow Garage, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Publishing ROS messages is difficult, as the publish function is + * not realtime safe. This class provides the proper locking so that + * you can call publish in realtime and a separate (non-realtime) + * thread will ensure that the message gets published over ROS. + * + * Author: Colin F. MacKenzie + */ + +#ifndef REALTIME_TOOLS__REALTIME_BARRIER_HPP_ +#define REALTIME_TOOLS__REALTIME_BARRIER_HPP_ + +#include +#include + + +namespace realtime_tools +{ + +/// @brief Attempt a lock on a resource but fail if already locked +class try_lock : public std::unique_lock +{ +public: + explicit try_lock(std::mutex & m) + : std::unique_lock(m, std::try_to_lock) + {} +}; + +/// @brief Lock a resource or wait for it to be free +class wait_lock : public std::unique_lock +{ +public: + explicit wait_lock(std::mutex & m) + : std::unique_lock(m, std::defer_lock) + { + // will wait some time + while (!try_lock()) { + std::this_thread::sleep_for(std::chrono::microseconds(500)); + } + } +}; + +/// @brief Default options for realtime access to resources +struct realtime +{ + using lock_strategy = try_lock; +}; + +/// @brief Default options for non-realtime access to resources +struct non_realtime +{ + using lock_strategy = wait_lock; +}; + + +/// @brief Implements the underlying mechanisms for communicating between threads. +/// MemoryBarrier implements a double buffer swapping mechanism for interchanging data between +/// threads. It allocates two T objects on the heap. When MemoryBarrier::swap() is called the two +/// pointers are swapped. Generally one pointer is accessed from the non-realtime thread and the +/// other is used by the realtime thread. Only one-way communication, in either direction, is +/// possible. For two-way communication two separate MemoryBarriers can be used as a pair. +/// +/// You should not use this class directly, you will work with ReadBarrier, WriteBarrier and +/// DirectAccess classes only. +/// +/// General strategy: +/// For non-RT writing to RT thread, non-RT thread writes but doesn't swap, RT swaps and reads. +/// For RT writing to non-RT, RT thread writes but doesn't swap, non-RT swaps and reads. +template +class MemoryBarrier +{ +public: + /// @brief Provides direct access to data + /// Should only be used from within the realtime thread. Default template args specify + /// DirectAccess should only try to lock the resource and on failure the DirectAccess object + /// would be equal to nullptr and therefore evaluate to false. + /// This is also used internally by the non-RT ReadBarrier and WriteBarrier classes but + /// implements the thread-safe access. + template + class DirectAccess : public lock_strategy + { +public: + using MemoryBarrierType = MemoryBarrier; + + explicit DirectAccess(MemoryBarrierType & mem_barrier) noexcept + : lock_strategy(mem_barrier.mutex_), mem_(&mem_barrier), obj_(nullptr) + { + obj_ = _get(); // Will return nullptr if we don't own the lock yet + } + + template + explicit DirectAccess(X & mem_barrier) noexcept + : lock_strategy(mem_barrier.memory().mutex_), mem_(&mem_barrier.memory()), obj_(nullptr) + { + obj_ = _get(); // will return nullptr if we dont own the lock yet + } + + template + explicit DirectAccess(X * mem_barrier) noexcept + : lock_strategy(mem_barrier->memory().mutex_), mem_(&mem_barrier->memory()), obj_(nullptr) + { + obj_ = _get(); // will return nullptr if we dont own the lock yet + } + + + // do not allow copying, only move semantics + DirectAccess(const DirectAccess &) = delete; + + DirectAccess & operator=(const DirectAccess &) = delete; + + DirectAccess(DirectAccess && move) noexcept + : mem_(move.mem_), obj_(move.obj_) + { + move.mem_ = nullptr; + } + + DirectAccess & operator=(DirectAccess && move) noexcept + { + mem_ = move.mem_; + move.mem_ = nullptr; + obj_ = move.obj_; + return *this; + } + + inline void reset() + { + if (lock_strategy::owns_lock()) { + lock_strategy::unlock(); + } + mem_ = nullptr; + obj_ = nullptr; + } + + inline bool new_data_available() const + { + return mem_->new_data_available(); + } + + inline void new_data_available(bool avail) + { + if (!lock_strategy::owns_lock()) { + throw std::runtime_error("Can't modify an unlocked MemoryBarrier"); + } + mem_->new_data_available_ = avail; + } + + inline void swap() + { + if (!std::unique_lock::owns_lock()) { + throw std::runtime_error("Can't swap an unlocked MemoryBarrier"); + } + mem_->swap(); + obj_ = nullptr; // Must explicitly set to null so _get() doesn't return cached value + obj_ = _get(); + } + + inline explicit operator bool() const + {return mem_ != nullptr && get();} + + inline bool operator==(std::nullptr_t) const + {return mem_ == nullptr || get() == nullptr;} + + inline bool operator!=(std::nullptr_t) const + {return mem_ != nullptr && get();} + + // smartptr semantics + inline T * get() + {return _get();} + + inline const T * get() const + {return _get();} + + inline T * operator->() + {return get();} + + inline const T * operator->() const + {return get();} + + inline T & operator*() + {return *get();} + + inline const T & operator*() const + {return *get();} + +private: + MemoryBarrierType * mem_; + mutable T * obj_; + + template + inline typename std::enable_if_t::value, + T *> + _get() + { + return obj_ ? + obj_ : + lock_strategy::owns_lock() ? + obj_ = mem_->rt_ : + nullptr; + } + + template + inline typename std::enable_if_t::value, T *> + _get() + { + return obj_ ? + obj_ : + lock_strategy::owns_lock() ? + obj_ = mem_->nrt_ : + nullptr; + } + + template + inline typename std::enable_if_t::value, + const T *> + _get() const + { + return obj_ ? + obj_ : + lock_strategy::owns_lock() ? + obj_ = mem_->rt_ : + nullptr; + } + + template + inline typename std::enable_if_t::value, const T *> + _get() const + { + return obj_ ? + obj_ : + lock_strategy::owns_lock() ? + obj_ = mem_->nrt_ : + nullptr; + } + }; + + + MemoryBarrier() + : nrt_(new T()), rt_(new T()), polarity_(false), new_data_available_(false) + { + } + + explicit MemoryBarrier(const T & data) + : nrt_(new T(data)), rt_(new T(data)), polarity_(false), new_data_available_(false) + { + } + + explicit MemoryBarrier(MemoryBarrier && move) noexcept + : nrt_(move.nrt_), rt_(move.rt_), polarity_(move.polarity_), new_data_available_( + move.new_data_available_) + { + move.nrt_ = nullptr; + move.rt_ = nullptr; + move.polarity_ = false; + move.new_data_available_ = false; + } + + // allow moving but not copying + MemoryBarrier(const MemoryBarrier &) = delete; + + MemoryBarrier & operator=(const MemoryBarrier &) = delete; + + + // todo: perhaps we need a new_data on both nrt and rt side + // true if new data is available, depends on if the memory is being used for Read + // from RT or Write to RT mode + inline bool new_data_available() const + {return new_data_available_;} + + void initialize(const T & value) + { + wait_lock guard(mutex_); + if (guard.owns_lock()) { + *nrt_ = *rt_ = value; + } else { + throw std::runtime_error("request to initialize realtime barrier failed trying to lock"); + } + } + +protected: + T * nrt_; + T * rt_; + bool polarity_; // flipped each time barrier rotates (swaps) + bool new_data_available_; + + // Set as mutable so that readFromNonRT() can be performed on a const buffer + mutable std::mutex mutex_; + + inline void swap() + { + // swap pointers + T * tmp = nrt_; + nrt_ = rt_; + rt_ = tmp; + polarity_ = !polarity_; + } + + friend class DirectAccess; + + friend class DirectAccess; +}; + +/// @brief Create a barrier for reading data from a realtime thread +/// ReadBarrier implements reading data from a realtime thread using a MemoryBarrier. For example, +/// this is used to transfer state data out of hardware interfaces. The default constructor will +// create a new memory barrier for use. There is an alternate constructor if you want to create +// your own MemoryBarrier explicitly. +template +class ReadBarrier +{ +public: + using MemoryBarrierType = MemoryBarrier; + + ReadBarrier() noexcept + : mem_(new MemoryBarrierType()), owns_mem(true) + { + } + + explicit ReadBarrier(MemoryBarrierType & mem_barrier) noexcept + : mem_(&mem_barrier), owns_mem(false) + { + } + + virtual ~ReadBarrier() + { + if (owns_mem) { + delete mem_; + } + } + + /// @brief Access the underlying MemoryBarrier object + inline MemoryBarrierType & memory() + {return *mem_;} + + /// @brief Returns true if new data is available to read + inline bool new_data_available() const + {return mem_->new_data_available();} + + /// @brief Get current value from non-realtime side without affecting the barrier. + /// Copy the current data into dest. No swap is performed and the new_data_available flag is + /// unaffected. + bool current(T & dest) + { + typename MemoryBarrierType::template DirectAccess direct(*mem_); + if (direct) { + dest = *direct; + return true; + } else { + return false; + } + } + + /// @brief Read data out of the realtime thread + /// Swap RT buffer for non-RT. Copy the new data into val and reset the new_data_available flag. + // todo: since this read also swaps, should it be called read_and_swap() or pop() or pull() + bool pull(T & dest) + { + typename MemoryBarrierType::template DirectAccess direct(*mem_); + if (direct && mem_->new_data_available()) { + direct.swap(); + direct.new_data_available(false); + dest = *direct; + return true; + } else { + return false; // failed to read + } + } + +private: + MemoryBarrierType * mem_; + bool owns_mem; +}; + + +/// @brief Create a barrier for writing data to a realtime thread +/// WriteBarrier implements writing data to a realtime thread from a non-realtime thread using a +/// MemoryBarrier. For example, this is used to transfer commands to a hardware interface. The +/// default constructor will create a new memory barrier for use. There is an alternate +/// constructor if you want to create your own MemoryBarrier explicitly. +template +class WriteBarrier +{ +public: + using MemoryBarrierType = MemoryBarrier; + + WriteBarrier() noexcept + : mem_(new MemoryBarrierType()) + { + } + + explicit WriteBarrier(MemoryBarrierType & mem_barrier) noexcept + : mem_(&mem_barrier) + { + } + + /// @brief Access the underlying MemoryBarrier object + inline MemoryBarrierType & memory() + {return *mem_;} + + /// @brief Get current value from non-realtime side without affecting the barrier. + /// Copy the current data into dest. No swap is performed and the new_data_available flag + /// is unaffected. + bool current(T & dest) + { + typename MemoryBarrierType::template DirectAccess direct(*mem_); + if (direct) { + dest = *direct; + return true; + } else { + return false; + } + } + + /// @brief Write data into the realtime thread + /// Copy the new data from val into non-RT buffer then swap RT buffer for non-RT and set the + /// new_data_available flag. + bool push(const T & data) + { + typename MemoryBarrierType::template DirectAccess direct(*mem_); + if (direct) { + *direct = data; + direct.swap(); + direct.new_data_available(true); + return true; + } else { + return false; + } + } + +private: + MemoryBarrierType * mem_; +}; + +} // namespace realtime_tools + + +#endif // REALTIME_TOOLS__REALTIME_BARRIER_HPP_ diff --git a/test/realtime_barrier_tests.cpp b/test/realtime_barrier_tests.cpp new file mode 100644 index 00000000..83ae80b2 --- /dev/null +++ b/test/realtime_barrier_tests.cpp @@ -0,0 +1,215 @@ +/* + * Copyright (c) 2021, FlyingEinstein.com + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the Willow Garage, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include + +using realtime_tools::MemoryBarrier; +using realtime_tools::ReadBarrier; +using realtime_tools::WriteBarrier; + +class DefaultConstructable +{ +public: + DefaultConstructable() + : number_(42) {} + ~DefaultConstructable() {} + int number_; +}; + +/* + * MemoryBarriers - base thread barrier mechanisms + */ + +TEST(MemoryBarrier, default_construct) +{ + MemoryBarrier buffer; + decltype(buffer)::DirectAccess<> da(buffer); + EXPECT_EQ(42, da->number_); +} + +TEST(MemoryBarrier, default_construct_new_data_flag_is_false) +{ + MemoryBarrier buffer; + EXPECT_FALSE(buffer.new_data_available()); +} + +TEST(MemoryBarrier, initialize_new_data) +{ + MemoryBarrier buffer; + DefaultConstructable x; + x.number_ = 12; + buffer.initialize(x); + EXPECT_FALSE(buffer.new_data_available()); // should still be false + decltype(buffer)::DirectAccess<> da(buffer); + EXPECT_EQ(12, da->number_); +} + + +/* + * MemoryBarrier::DirectAccess - directly manipulate a memory barrier + */ + +TEST(DirectAccess, new_data_available_flag) +{ + MemoryBarrier buffer; + decltype(buffer)::DirectAccess<> da(buffer); + EXPECT_EQ(42, da->number_); + EXPECT_FALSE(da.new_data_available()); + da.new_data_available(true); + EXPECT_TRUE(da.new_data_available()); +} + + +TEST(DirectAccess, reset_releases_resource) +{ + MemoryBarrier buffer; + decltype(buffer)::DirectAccess rt(buffer); + EXPECT_NE(rt.get(), nullptr); + EXPECT_TRUE(rt); + rt->number_ = 12; + EXPECT_EQ(12, rt->number_); + rt.reset(); + EXPECT_EQ(rt.get(), nullptr); + EXPECT_FALSE(rt); + decltype(buffer)::DirectAccess rt2(buffer); + EXPECT_NE(rt2.get(), nullptr); + EXPECT_TRUE(rt2); + EXPECT_EQ(12, rt2->number_); +} + +TEST(DirectAccess, swap_buffers) +{ + MemoryBarrier buffer; + decltype(buffer)::DirectAccess nrt(buffer); + nrt->number_ = 24; + nrt.reset(); // required or RT request below will cause a deadlock + + decltype(buffer)::DirectAccess rt(buffer); + rt->number_ = 12; + rt.swap(); + EXPECT_EQ(24, rt->number_); + rt.reset(); // required to prevent deadlock again + + // now check non-realtime + decltype(buffer)::DirectAccess nrt2(buffer); + EXPECT_EQ(12, nrt2->number_); +} + +TEST(DirectAccess, realtime_fails_on_locked_resource) +{ + MemoryBarrier buffer; + decltype(buffer)::DirectAccess nrt(buffer); + + decltype(buffer)::DirectAccess rt(buffer); + EXPECT_EQ(rt.get(), nullptr); + EXPECT_FALSE(rt); +} + + +/* + * Read Barriers + */ + +TEST(ReadBarrier, default_construct) +{ + ReadBarrier buffer; + decltype(buffer)::MemoryBarrierType::DirectAccess<> da(buffer); + EXPECT_EQ(42, da->number_); +} + +TEST(ReadBarrier, write_from_RT_to_nRT) +{ + // write state to non-RT + ReadBarrier buffer; + decltype(buffer)::MemoryBarrierType::DirectAccess<> da_writer(buffer); + da_writer->number_ = 8; // using smart_ptr-like semantics + da_writer.new_data_available(true); // indicate to non-RT there is new state data + da_writer.reset(); // unlock the state barrier +} + +TEST(ReadBarrier, read_on_nRT) +{ + // write state to non-RT + ReadBarrier buffer; + + // repeat RT write + decltype(buffer)::MemoryBarrierType::DirectAccess<> da_writer(buffer); + da_writer->number_ = 8; // using smart_ptr-like semantics + da_writer.new_data_available(true); // indicate to non-RT there is new state data + da_writer.reset(); // unlock the state barrier + + // now test the read from nRT thread + // get the data from RT thread and write into our user + DefaultConstructable state; + EXPECT_TRUE(buffer.memory().new_data_available()); + EXPECT_TRUE(buffer.pull(state)); + EXPECT_EQ(8, state.number_); +} + + +/* + * Write Barriers + */ + +TEST(WriteBarrier, default_construct) +{ + WriteBarrier buffer; + decltype(buffer)::MemoryBarrierType::DirectAccess<> da(buffer.memory()); + EXPECT_EQ(42, da->number_); +} + +TEST(WriteBarrier, write_to_RT_from_nRT) +{ + WriteBarrier buffer; + + DefaultConstructable command; + command.number_ = 52; + + // write commands to RT thread + EXPECT_FALSE(buffer.memory().new_data_available()); + EXPECT_TRUE(buffer.push(command)); + EXPECT_TRUE(buffer.memory().new_data_available()); +} + +TEST(WriteBarrier, read_on_RT) +{ + WriteBarrier buffer; + + DefaultConstructable command; + command.number_ = 52; + + // write commands to RT thread + EXPECT_TRUE(buffer.push(command)); + + // confirm read from RT + decltype(buffer)::MemoryBarrierType::DirectAccess<> da_reader(buffer); + EXPECT_TRUE(da_reader.new_data_available()); + EXPECT_EQ(da_reader->number_, 52); // using smart_ptr-like semantics +}