Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] create FrequencyCalculator class for sender / receiver frequency calculation. #1429

Merged
merged 6 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ endif()
set(ecal_util_src
src/util/ecal_expmap.h
src/util/ecal_thread.h
src/util/frequency_calculator.h
src/util/getenvvar.h
)
if (ECAL_CORE_COMMAND_LINE)
Expand Down
52 changes: 18 additions & 34 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ namespace eCAL
m_read_time(0),
m_receive_time(0),
m_clock(0),
m_clock_old(0),
m_freq(0),
m_frequency_calculator(3.0f),
m_message_drops(0),
m_loc_published(false),
m_ext_published(false),
Expand All @@ -98,9 +97,7 @@ namespace eCAL
m_topic_id.clear();
m_topic_info = topic_info_;
m_clock = 0;
m_clock_old = 0;
m_message_drops = 0;
m_rec_time = std::chrono::steady_clock::time_point();
m_created = false;
#ifndef NDEBUG
// log it
Expand Down Expand Up @@ -164,10 +161,7 @@ namespace eCAL
// reset defaults
m_created = false;
m_clock = 0;
m_clock_old = 0;
m_freq = 0;
m_message_drops = 0;
m_rec_time = std::chrono::steady_clock::time_point();

m_use_udp_mc_confirmed = false;
m_use_shm_confirmed = false;
Expand Down Expand Up @@ -299,7 +293,7 @@ namespace eCAL
ecal_reg_sample_topic.pname = m_pname;
ecal_reg_sample_topic.uname = Process::GetUnitName();
ecal_reg_sample_topic.dclock = m_clock;
ecal_reg_sample_topic.dfreq = m_freq;
ecal_reg_sample_topic.dfreq = GetFrequency();
ecal_reg_sample_topic.message_drops = static_cast<int32_t>(m_message_drops);

// we do not know the number of connections ..
Expand Down Expand Up @@ -477,6 +471,13 @@ namespace eCAL
// increase read clock
m_clock++;

// Update frequency calculation
{
const auto receive_time = std::chrono::steady_clock::now();
const std::lock_guard<std::mutex> freq_lock(m_frequency_calculator_mutex);
m_frequency_calculator.addTick(receive_time);
}

// reset timeout
m_receive_time = 0;

Expand Down Expand Up @@ -889,37 +890,20 @@ namespace eCAL
// should never be reached
return false;
}

int32_t CDataReader::GetFrequency()
{
const auto frequency_time = std::chrono::steady_clock::now();
const std::lock_guard<std::mutex> lock(m_frequency_calculator_mutex);
return static_cast<int32_t>(m_frequency_calculator.getFrequency(frequency_time) * 1000);
}

void CDataReader::RefreshRegistration()
{
if(!m_created) return;

// ensure that registration is not called within zero nanoseconds
// normally it will be called from registration logic every second
auto curr_time = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(curr_time - m_rec_time) > std::chrono::milliseconds(0))
{
// reset clock and time on first call
if (m_clock_old == 0)
{
m_clock_old = m_clock;
m_rec_time = curr_time;
}

// check for clock difference
else if ((m_clock - m_clock_old) > 0)
{
// calculate frequency in mHz
m_freq = static_cast<long>((1000 * 1000 * (m_clock - m_clock_old)) / std::chrono::duration_cast<std::chrono::milliseconds>(curr_time - m_rec_time).count());
// reset clock and time
m_clock_old = m_clock;
m_rec_time = curr_time;
}
else
{
m_freq = 0;
}
}

// register without send
Register(false);
Expand All @@ -945,6 +929,7 @@ namespace eCAL
{
std::stringstream out;


out << '\n';
out << indent_ << "------------------------------------" << '\n';
out << indent_ << " class CDataReader " << '\n';
Expand All @@ -960,8 +945,7 @@ namespace eCAL
out << indent_ << "m_read_buf.size(): " << m_read_buf.size() << '\n';
out << indent_ << "m_read_time: " << m_read_time << '\n';
out << indent_ << "m_clock: " << m_clock << '\n';
out << indent_ << "m_rec_time: " << std::chrono::duration_cast<std::chrono::milliseconds>(m_rec_time.time_since_epoch()).count() << '\n';
out << indent_ << "m_freq: " << m_freq << '\n';
out << indent_ << "frequency [mHz]: " << GetFrequency() << '\n';
out << indent_ << "m_created: " << m_created << '\n';
out << '\n';

Expand Down
10 changes: 7 additions & 3 deletions ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include <string>
#include <unordered_map>

#include <util/frequency_calculator.h>

namespace eCAL
{
class CDataReader
Expand Down Expand Up @@ -108,6 +110,8 @@ namespace eCAL
void Disconnect();
bool CheckMessageClock(const std::string& tid_, long long current_clock_);

int32_t GetFrequency();

std::string m_host_name;
std::string m_host_group_name;
int m_pid;
Expand Down Expand Up @@ -141,9 +145,9 @@ namespace eCAL
EventCallbackMapT m_event_callback_map;

std::atomic<long long> m_clock;
long long m_clock_old;
std::chrono::steady_clock::time_point m_rec_time;
long m_freq;

std::mutex m_frequency_calculator_mutex;
ResettableFrequencyCalculator<std::chrono::steady_clock> m_frequency_calculator;

std::set<long long> m_id_set;

Expand Down
51 changes: 16 additions & 35 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ namespace eCAL
m_connected(false),
m_id(0),
m_clock(0),
m_clock_old(0),
m_freq(0),
m_frequency_calculator(3.0f),
m_loc_subscribed(false),
m_ext_subscribed(false),
m_use_ttype(true),
Expand Down Expand Up @@ -117,9 +116,6 @@ namespace eCAL
m_topic_info = topic_info_;
m_id = 0;
m_clock = 0;
m_clock_old = 0;
m_snd_time = std::chrono::steady_clock::time_point();
m_freq = 0;
m_buffering_shm = Config::GetMemfileBufferCount();
m_zero_copy = Config::IsMemfileZerocopyEnabled();
m_acknowledge_timeout_ms = Config::GetMemfileAckTimeoutMs();
Expand Down Expand Up @@ -196,9 +192,6 @@ namespace eCAL
// reset defaults
m_id = 0;
m_clock = 0;
m_clock_old = 0;
m_snd_time = std::chrono::steady_clock::time_point();
m_freq = 0;
m_buffering_shm = Config::GetMemfileBufferCount();
m_zero_copy = Config::IsMemfileZerocopyEnabled();
m_acknowledge_timeout_ms = Config::GetMemfileAckTimeoutMs();
Expand Down Expand Up @@ -408,6 +401,13 @@ namespace eCAL

size_t CDataWriter::Write(CPayloadWriter& payload_, long long time_, long long id_)
{
{
// we should think about if we would like to potentially use the `time_` variable to tick with (but we would need the same base for checking incoming samples then....
const auto send_time = std::chrono::steady_clock::now();
const std::lock_guard<std::mutex> lock(m_frequency_calculator_mutex);
m_frequency_calculator.addTick(send_time);
}

// check writer modes
if (!CheckWriterModes())
{
Expand Down Expand Up @@ -717,32 +717,6 @@ namespace eCAL
{
if (!m_created) return;

// force to register every second to refresh data clock information
auto curr_time = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(curr_time - m_snd_time) > std::chrono::milliseconds(0))
{
// reset clock and time on first call
if (m_clock_old == 0)
{
m_clock_old = m_clock;
m_snd_time = curr_time;
}

// check for clock difference
if ((m_clock - m_clock_old) > 0)
{
// calculate frequency in mHz
m_freq = static_cast<long>((1000 * 1000 * (m_clock - m_clock_old)) / std::chrono::duration_cast<std::chrono::milliseconds>(curr_time - m_snd_time).count());
// reset clock and time
m_clock_old = m_clock;
m_snd_time = curr_time;
}
else
{
m_freq = 0;
}
}

// register without send
Register(false);

Expand Down Expand Up @@ -785,6 +759,7 @@ namespace eCAL
out << indent_ << "m_topic_info.desc: " << m_topic_info.descriptor << '\n';
out << indent_ << "m_id: " << m_id << '\n';
out << indent_ << "m_clock: " << m_clock << '\n';
out << indent_ << "frequency [mHz]: " << GetFrequency() << '\n';
out << indent_ << "m_created: " << m_created << '\n';
out << indent_ << "m_loc_subscribed: " << m_loc_subscribed << '\n';
out << indent_ << "m_ext_subscribed: " << m_ext_subscribed << '\n';
Expand Down Expand Up @@ -877,7 +852,7 @@ namespace eCAL
ecal_reg_sample_topic.uname = Process::GetUnitName();
ecal_reg_sample_topic.did = m_id;
ecal_reg_sample_topic.dclock = m_clock;
ecal_reg_sample_topic.dfreq = m_freq;
ecal_reg_sample_topic.dfreq = GetFrequency();

size_t loc_connections(0);
size_t ext_connections(0);
Expand Down Expand Up @@ -1249,4 +1224,10 @@ namespace eCAL
(void)base_msg_;
#endif
}
int32_t CDataWriter::GetFrequency()
{
const auto frequency_time = std::chrono::steady_clock::now();
const std::lock_guard<std::mutex> lock(m_frequency_calculator_mutex);
return static_cast<int32_t>(m_frequency_calculator.getFrequency(frequency_time) * 1000);
}
}
9 changes: 6 additions & 3 deletions ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "ecal_def.h"
#include "util/ecal_expmap.h"
#include <util/frequency_calculator.h>

#if ECAL_CORE_TRANSPORT_UDP
#include "udp/ecal_writer_udp_mc.h"
Expand Down Expand Up @@ -148,6 +149,8 @@ namespace eCAL
bool IsInternalSubscribedOnly();
void LogSendMode(TLayer::eSendMode smode_, const std::string& base_msg_);

int32_t GetFrequency();

std::string m_host_name;
std::string m_host_group_name;
int m_pid;
Expand Down Expand Up @@ -178,9 +181,9 @@ namespace eCAL

long long m_id;
long long m_clock;
long long m_clock_old;
std::chrono::steady_clock::time_point m_snd_time;
long m_freq;

std::mutex m_frequency_calculator_mutex;
ResettableFrequencyCalculator<std::chrono::steady_clock> m_frequency_calculator;

std::atomic<bool> m_loc_subscribed;
std::atomic<bool> m_ext_subscribed;
Expand Down
Loading
Loading