diff --git a/ecal/core/CMakeLists.txt b/ecal/core/CMakeLists.txt index 3f320f2677..d5f4e8d35f 100644 --- a/ecal/core/CMakeLists.txt +++ b/ecal/core/CMakeLists.txt @@ -402,6 +402,7 @@ endif() set(ecal_util_src src/util/ecal_expmap.h src/util/ecal_thread.h + src/util/frequency_calculator.h src/util/getenvvar.h ) if (ECAL_CORE_COMMAND_LINE) diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index 264bda476c..5d1985f7bb 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -70,8 +70,7 @@ namespace eCAL m_read_time(0), m_receive_time(0), m_clock(0), - m_clock_old(0), - m_freq(0), + m_frequency_calculator(3.0f), m_message_drops(0), m_loc_published(false), m_ext_published(false), @@ -98,9 +97,7 @@ namespace eCAL m_topic_id.clear(); m_topic_info = topic_info_; m_clock = 0; - m_clock_old = 0; m_message_drops = 0; - m_rec_time = std::chrono::steady_clock::time_point(); m_created = false; #ifndef NDEBUG // log it @@ -164,10 +161,7 @@ namespace eCAL // reset defaults m_created = false; m_clock = 0; - m_clock_old = 0; - m_freq = 0; m_message_drops = 0; - m_rec_time = std::chrono::steady_clock::time_point(); m_use_udp_mc_confirmed = false; m_use_shm_confirmed = false; @@ -299,7 +293,7 @@ namespace eCAL ecal_reg_sample_topic.pname = m_pname; ecal_reg_sample_topic.uname = Process::GetUnitName(); ecal_reg_sample_topic.dclock = m_clock; - ecal_reg_sample_topic.dfreq = m_freq; + ecal_reg_sample_topic.dfreq = GetFrequency(); ecal_reg_sample_topic.message_drops = static_cast(m_message_drops); // we do not know the number of connections .. @@ -477,6 +471,13 @@ namespace eCAL // increase read clock m_clock++; + // Update frequency calculation + { + const auto receive_time = std::chrono::steady_clock::now(); + const std::lock_guard freq_lock(m_frequency_calculator_mutex); + m_frequency_calculator.addTick(receive_time); + } + // reset timeout m_receive_time = 0; @@ -889,6 +890,13 @@ namespace eCAL // should never be reached return false; } + + int32_t CDataReader::GetFrequency() + { + const auto frequency_time = std::chrono::steady_clock::now(); + const std::lock_guard lock(m_frequency_calculator_mutex); + return static_cast(m_frequency_calculator.getFrequency(frequency_time) * 1000); + } void CDataReader::RefreshRegistration() { @@ -896,30 +904,6 @@ namespace eCAL // ensure that registration is not called within zero nanoseconds // normally it will be called from registration logic every second - auto curr_time = std::chrono::steady_clock::now(); - if (std::chrono::duration_cast(curr_time - m_rec_time) > std::chrono::milliseconds(0)) - { - // reset clock and time on first call - if (m_clock_old == 0) - { - m_clock_old = m_clock; - m_rec_time = curr_time; - } - - // check for clock difference - else if ((m_clock - m_clock_old) > 0) - { - // calculate frequency in mHz - m_freq = static_cast((1000 * 1000 * (m_clock - m_clock_old)) / std::chrono::duration_cast(curr_time - m_rec_time).count()); - // reset clock and time - m_clock_old = m_clock; - m_rec_time = curr_time; - } - else - { - m_freq = 0; - } - } // register without send Register(false); @@ -945,6 +929,7 @@ namespace eCAL { std::stringstream out; + out << '\n'; out << indent_ << "------------------------------------" << '\n'; out << indent_ << " class CDataReader " << '\n'; @@ -960,8 +945,7 @@ namespace eCAL out << indent_ << "m_read_buf.size(): " << m_read_buf.size() << '\n'; out << indent_ << "m_read_time: " << m_read_time << '\n'; out << indent_ << "m_clock: " << m_clock << '\n'; - out << indent_ << "m_rec_time: " << std::chrono::duration_cast(m_rec_time.time_since_epoch()).count() << '\n'; - out << indent_ << "m_freq: " << m_freq << '\n'; + out << indent_ << "frequency [mHz]: " << GetFrequency() << '\n'; out << indent_ << "m_created: " << m_created << '\n'; out << '\n'; diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index 2f7613b539..d812ffb940 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -44,6 +44,8 @@ #include #include +#include + namespace eCAL { class CDataReader @@ -108,6 +110,8 @@ namespace eCAL void Disconnect(); bool CheckMessageClock(const std::string& tid_, long long current_clock_); + int32_t GetFrequency(); + std::string m_host_name; std::string m_host_group_name; int m_pid; @@ -141,9 +145,9 @@ namespace eCAL EventCallbackMapT m_event_callback_map; std::atomic m_clock; - long long m_clock_old; - std::chrono::steady_clock::time_point m_rec_time; - long m_freq; + + std::mutex m_frequency_calculator_mutex; + ResettableFrequencyCalculator m_frequency_calculator; std::set m_id_set; diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index 57315f1e57..d47529e6c3 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -80,8 +80,7 @@ namespace eCAL m_connected(false), m_id(0), m_clock(0), - m_clock_old(0), - m_freq(0), + m_frequency_calculator(3.0f), m_loc_subscribed(false), m_ext_subscribed(false), m_use_ttype(true), @@ -117,9 +116,6 @@ namespace eCAL m_topic_info = topic_info_; m_id = 0; m_clock = 0; - m_clock_old = 0; - m_snd_time = std::chrono::steady_clock::time_point(); - m_freq = 0; m_buffering_shm = Config::GetMemfileBufferCount(); m_zero_copy = Config::IsMemfileZerocopyEnabled(); m_acknowledge_timeout_ms = Config::GetMemfileAckTimeoutMs(); @@ -196,9 +192,6 @@ namespace eCAL // reset defaults m_id = 0; m_clock = 0; - m_clock_old = 0; - m_snd_time = std::chrono::steady_clock::time_point(); - m_freq = 0; m_buffering_shm = Config::GetMemfileBufferCount(); m_zero_copy = Config::IsMemfileZerocopyEnabled(); m_acknowledge_timeout_ms = Config::GetMemfileAckTimeoutMs(); @@ -408,6 +401,13 @@ namespace eCAL size_t CDataWriter::Write(CPayloadWriter& payload_, long long time_, long long id_) { + { + // we should think about if we would like to potentially use the `time_` variable to tick with (but we would need the same base for checking incoming samples then.... + const auto send_time = std::chrono::steady_clock::now(); + const std::lock_guard lock(m_frequency_calculator_mutex); + m_frequency_calculator.addTick(send_time); + } + // check writer modes if (!CheckWriterModes()) { @@ -717,32 +717,6 @@ namespace eCAL { if (!m_created) return; - // force to register every second to refresh data clock information - auto curr_time = std::chrono::steady_clock::now(); - if (std::chrono::duration_cast(curr_time - m_snd_time) > std::chrono::milliseconds(0)) - { - // reset clock and time on first call - if (m_clock_old == 0) - { - m_clock_old = m_clock; - m_snd_time = curr_time; - } - - // check for clock difference - if ((m_clock - m_clock_old) > 0) - { - // calculate frequency in mHz - m_freq = static_cast((1000 * 1000 * (m_clock - m_clock_old)) / std::chrono::duration_cast(curr_time - m_snd_time).count()); - // reset clock and time - m_clock_old = m_clock; - m_snd_time = curr_time; - } - else - { - m_freq = 0; - } - } - // register without send Register(false); @@ -785,6 +759,7 @@ namespace eCAL out << indent_ << "m_topic_info.desc: " << m_topic_info.descriptor << '\n'; out << indent_ << "m_id: " << m_id << '\n'; out << indent_ << "m_clock: " << m_clock << '\n'; + out << indent_ << "frequency [mHz]: " << GetFrequency() << '\n'; out << indent_ << "m_created: " << m_created << '\n'; out << indent_ << "m_loc_subscribed: " << m_loc_subscribed << '\n'; out << indent_ << "m_ext_subscribed: " << m_ext_subscribed << '\n'; @@ -877,7 +852,7 @@ namespace eCAL ecal_reg_sample_topic.uname = Process::GetUnitName(); ecal_reg_sample_topic.did = m_id; ecal_reg_sample_topic.dclock = m_clock; - ecal_reg_sample_topic.dfreq = m_freq; + ecal_reg_sample_topic.dfreq = GetFrequency(); size_t loc_connections(0); size_t ext_connections(0); @@ -1249,4 +1224,10 @@ namespace eCAL (void)base_msg_; #endif } + int32_t CDataWriter::GetFrequency() + { + const auto frequency_time = std::chrono::steady_clock::now(); + const std::lock_guard lock(m_frequency_calculator_mutex); + return static_cast(m_frequency_calculator.getFrequency(frequency_time) * 1000); + } } diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index 47d550f460..e0fd19afee 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -33,6 +33,7 @@ #include "ecal_def.h" #include "util/ecal_expmap.h" +#include #if ECAL_CORE_TRANSPORT_UDP #include "udp/ecal_writer_udp_mc.h" @@ -148,6 +149,8 @@ namespace eCAL bool IsInternalSubscribedOnly(); void LogSendMode(TLayer::eSendMode smode_, const std::string& base_msg_); + int32_t GetFrequency(); + std::string m_host_name; std::string m_host_group_name; int m_pid; @@ -178,9 +181,9 @@ namespace eCAL long long m_id; long long m_clock; - long long m_clock_old; - std::chrono::steady_clock::time_point m_snd_time; - long m_freq; + + std::mutex m_frequency_calculator_mutex; + ResettableFrequencyCalculator m_frequency_calculator; std::atomic m_loc_subscribed; std::atomic m_ext_subscribed; diff --git a/ecal/core/src/util/frequency_calculator.h b/ecal/core/src/util/frequency_calculator.h new file mode 100644 index 0000000000..1ec85f4f04 --- /dev/null +++ b/ecal/core/src/util/frequency_calculator.h @@ -0,0 +1,142 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2019 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +/** + * @brief This file provides classes to calculate frequencies. + * These classes are NOT threadsafe! +**/ + +#pragma once + +#include +#include +#include +#include + +namespace eCAL +{ + template + double calculateFrequency(const std::chrono::time_point& start, const std::chrono::time_point& end, long long ticks) + { + auto time_delta = (end - start).count(); + if (time_delta == 0) + { + return std::numeric_limits::max(); + } + auto denominator = typename T::duration::period().den; + auto numerator = typename T::duration::period().num; + return ((double)denominator * ticks / ((double) numerator * time_delta)); + } + + template + class FrequencyCalculator + { + public: + using time_point = std::chrono::time_point; + + FrequencyCalculator(const time_point& now_) + : counted_elements(0) + , first_tick(now_) + , last_tick(now_) + , previous_frequency(0.0) + { + } + + void addTick(const time_point& now) + { + counted_elements++; + last_tick = now; + } + + double getFrequency() + { + if (counted_elements == 0) + { + return previous_frequency; + } + + previous_frequency = calculateFrequency(first_tick, last_tick, counted_elements); + first_tick = last_tick; + counted_elements = 0; + return previous_frequency; + } + + private: + long long counted_elements; + time_point first_tick; + time_point last_tick; + double previous_frequency; + }; + + + template + class ResettableFrequencyCalculator + { + public: + using time_point = std::chrono::time_point; + + ResettableFrequencyCalculator(float reset_factor_) + : reset_factor(reset_factor_) + {} + + void addTick(const time_point& now) + { + if (!calculator) + { + calculator = std::make_unique>(now); + } + else + { + calculator->addTick(now); + } + last_tick = now; + } + + double getFrequency(const time_point& now) + { + double frequency = calculator ? calculator->getFrequency() : 0.0; + + if (frequency == 0.0) + { + return 0.0; + } + + // calculate theoretical frequency to detect timeouts; + double theoretical_frequency = calculateFrequency(last_tick, now, 1); + // If the frequency is higher than reset_factor * theoretical_frequency, we reset. + if (frequency >= theoretical_frequency * reset_factor) + { + calculator.reset(); + return 0.0; + } + + return frequency; + } + + private: + float reset_factor; + time_point last_tick; + std::unique_ptr> calculator; + }; +} + + + + + diff --git a/ecal/tests/cpp/util_test/src/util_test.cpp b/ecal/tests/cpp/util_test/src/util_test.cpp index cf60821967..c5cd59b3ca 100644 --- a/ecal/tests/cpp/util_test/src/util_test.cpp +++ b/ecal/tests/cpp/util_test/src/util_test.cpp @@ -17,8 +17,10 @@ * ========================= eCAL LICENSE ================================= */ +#include #include #include +#include namespace { @@ -52,3 +54,141 @@ TEST(Util, SplitCombinedTopicType) TestSplitCombinedTopicType("base:std::string", "base", "std::string"); TestSplitCombinedTopicType("MyType", "", "MyType"); } + +struct MillisecondFrequencyPair +{ + std::chrono::milliseconds delta_t; + double frequency; +}; + +const std::vector frequency_pairs = +{ + {std::chrono::milliseconds(1000), 1.0}, + {std::chrono::milliseconds(1250), 0.8}, + {std::chrono::milliseconds(5000), 0.2}, + {std::chrono::milliseconds(20000), 0.05}, + {std::chrono::milliseconds(500), 2.0}, + {std::chrono::milliseconds(200), 5.0}, + {std::chrono::milliseconds(100), 10.0}, + {std::chrono::milliseconds(20), 50.0}, + {std::chrono::milliseconds(2), 500.0}, +}; + + +TEST(Util, FrequencyCalculator) +{ + for (const auto& pair : frequency_pairs) + { + { + auto now = std::chrono::steady_clock::now(); + eCAL::FrequencyCalculator calculator(now); + EXPECT_DOUBLE_EQ(calculator.getFrequency(), 0.0f); + + for (int j = 0; j < 100; ++j) + { + for (int i = 0; i < 20; ++i) + { + now = now + pair.delta_t; + calculator.addTick(now); + } + + EXPECT_DOUBLE_EQ(calculator.getFrequency(), pair.frequency); + } + } + + { + auto now = std::chrono::steady_clock::now(); + eCAL::FrequencyCalculator calculator(now); + + for (int i = 0; i < 5; ++i) + { + now = now + pair.delta_t; + calculator.addTick(now); + EXPECT_DOUBLE_EQ(calculator.getFrequency(), pair.frequency); + } + } + } +} + +TEST(Util, ResettableFrequencyCalculator) +{ + const auto check_delta_t = std::chrono::milliseconds(999); + + + for (const auto& pair : frequency_pairs) + { + { + //auto start = std::chrono::steady_clock::now(); + auto start = std::chrono::steady_clock::time_point(std::chrono::milliseconds(0)); + auto next_frequency_update = start; + auto next_tick = start; + eCAL::ResettableFrequencyCalculator calculator(3.0f); + calculator.addTick(next_tick); + EXPECT_DOUBLE_EQ(calculator.getFrequency(next_frequency_update), 0.0f); + + for (int i = 0; i < 100; ++i) + { + next_frequency_update = next_frequency_update + check_delta_t; + + while (next_tick + pair.delta_t <= next_frequency_update) + { + next_tick = next_tick + pair.delta_t; + calculator.addTick(next_tick); + } + + if (next_frequency_update - start < pair.delta_t) + { + // no updates happened yet - expect 0 frequency + EXPECT_DOUBLE_EQ(calculator.getFrequency(next_frequency_update), 0.0); + } + else + { + EXPECT_DOUBLE_EQ(calculator.getFrequency(next_frequency_update), pair.frequency); + } + } + + // Now check timeout behavior + // Calculate when timeout should happen + auto timeout_time = next_tick + 3.0f * pair.delta_t; + for (int i = 0; i < 100; ++i) + { + next_frequency_update = next_frequency_update + check_delta_t; + if (next_frequency_update <= timeout_time) + { + EXPECT_DOUBLE_EQ(calculator.getFrequency(next_frequency_update), pair.frequency); + } + else + { + EXPECT_DOUBLE_EQ(calculator.getFrequency(next_frequency_update), 0.0); + } + } + + // Finally we're getting new ticks again! let's check frequencies again! + auto new_start = next_frequency_update; + next_tick = next_frequency_update; + // Check that filter goes back + calculator.addTick(next_tick); + for (int i = 0; i < 100; ++i) + { + next_frequency_update = next_frequency_update + check_delta_t; + + while (next_tick + pair.delta_t <= next_frequency_update) + { + next_tick = next_tick + pair.delta_t; + calculator.addTick(next_tick); + } + + if (next_frequency_update - new_start < pair.delta_t) + { + // no updates happened yet - expect 0 frequency + EXPECT_DOUBLE_EQ(calculator.getFrequency(next_frequency_update), 0.0); + } + else + { + EXPECT_DOUBLE_EQ(calculator.getFrequency(next_frequency_update), pair.frequency); + } + } + } + } +} +