From 02abba320fd108eebae594dcb812e0057a8ee8f4 Mon Sep 17 00:00:00 2001 From: reicheratwork <66302498+reicheratwork@users.noreply.github.com> Date: Fri, 9 Jun 2023 11:56:38 +0200 Subject: [PATCH] Throughput example fixes (#404) * CP-298 Fixed the output formatting of the throughput example programs to match that of the CycloneDDS-C throughput examples Signed-off-by: Martijn Reicher * CP-301 Removed output dots Signed-off-by: Martijn Reicher * CP-302 Set batched writes on throughput example programs Signed-off-by: Martijn Reicher * CP-300 Fixed default parameters on throughput example programs to match those on CycloneDDS-C Signed-off-by: Martijn Reicher * CP-299 Fixed QoS on throughput example programs Signed-off-by: Martijn Reicher * Fix type conversion warning Windows does not like converting from size_t to unsigned long, so made that conversion explicit Signed-off-by: Martijn Reicher * Fix rate calculation Signed-off-by: Erik Boasson * Throughput example: only flush if burstInterval>0 Signed-off-by: Erik Boasson --------- Signed-off-by: Martijn Reicher Signed-off-by: Erik Boasson Co-authored-by: Erik Boasson --- examples/throughput/publisher.cpp | 32 ++++++++++++------------------ examples/throughput/subscriber.cpp | 30 +++++++++++++++++----------- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/examples/throughput/publisher.cpp b/examples/throughput/publisher.cpp index 2d523fd3..154bceba 100644 --- a/examples/throughput/publisher.cpp +++ b/examples/throughput/publisher.cpp @@ -39,11 +39,11 @@ static volatile sig_atomic_t done(false); /*semaphore for keeping track of wheth static uint32_t payloadSize(8192); /*size of the payload of the sent messages*/ -static std::chrono::milliseconds burstInterval(100); /*interval between bursts of messages*/ +static std::chrono::milliseconds burstInterval(0); /*interval between bursts of messages*/ -static uint32_t burstSize(10); /*number of messages to send each burst*/ +static uint32_t burstSize(1); /*number of messages to send each burst*/ -static std::chrono::seconds timeOut(30); /*timeout before the writer will give up*/ +static std::chrono::seconds timeOut(0); /*timeout before the writer will give up*/ static std::string partitionName("Throughput example"); /*name of the domain on which the throughput test is run*/ @@ -117,15 +117,11 @@ bool wait_for_reader(dds::pub::DataWriter &writer) void start_writing( dds::pub::DataWriter &writer, - ThroughputModule::DataType &sample, - std::chrono::milliseconds &d_int, - uint32_t b_size, - std::chrono::seconds &t_out) + ThroughputModule::DataType &sample) { bool timedOut = false; auto pubStart = std::chrono::steady_clock::now(); - auto reportstart = pubStart; if (!done) { @@ -134,7 +130,7 @@ void start_writing( while (!done && !timedOut) { auto burstStart = std::chrono::steady_clock::now(); - for (uint32_t i = 0; i < b_size; i++) + for (uint32_t i = 0; i < burstSize; i++) { try { writer.write(sample); @@ -150,17 +146,15 @@ void start_writing( sample.count()++; } - writer->write_flush(); - std::this_thread::sleep_until(burstStart + d_int); + if (burstInterval > std::chrono::seconds(0)) { + writer->write_flush(); + std::this_thread::sleep_until(burstStart + burstInterval); + } auto n = std::chrono::steady_clock::now(); - if (t_out > std::chrono::milliseconds(0) && - n > pubStart+t_out) - { + if (timeOut > std::chrono::milliseconds(0) && + n > pubStart+timeOut) { timedOut = true; - } else if (reportstart + std::chrono::seconds(1) < n) { - std::cout << "." << std::flush; - reportstart = n; } if (writer.publication_matched_status().current_count() == 0) { @@ -196,7 +190,7 @@ int main (int argc, char **argv) dds::pub::Publisher publisher(participant, pqos); - dds::pub::qos::DataWriterQos wqos; + dds::pub::qos::DataWriterQos wqos(tqos); wqos << dds::core::policy::WriterBatching::BatchUpdates(); dds::pub::DataWriter writer(publisher, topic, wqos); @@ -211,7 +205,7 @@ int main (int argc, char **argv) signal (SIGINT, sigint); /* Register the sample instance and write samples repeatedly or until time out */ - start_writing(writer, sample, burstInterval, burstSize, timeOut); + start_writing(writer, sample); /* Cleanup */ writer.dispose_instance(sample); diff --git a/examples/throughput/subscriber.cpp b/examples/throughput/subscriber.cpp index b3f72da5..d132a4a6 100644 --- a/examples/throughput/subscriber.cpp +++ b/examples/throughput/subscriber.cpp @@ -40,6 +40,8 @@ static unsigned long long outOfOrder(0); /*keeps track of out of order samples*/ static unsigned long long total_bytes(0); /*keeps track of total bytes received*/ +static unsigned long payloadSize(0); /*size of the last payload received*/ + static unsigned long long total_samples(0); /*keeps track of total samples received*/ static std::chrono::milliseconds pollingDelay(-1); /*default is a listener*/ @@ -100,7 +102,8 @@ unsigned long long do_take(dds::sub::DataReader& rd) outOfOrder++; valid_samples++; - total_bytes += s.data().payload().size(); + payloadSize = static_cast(s.data().payload().size()); + total_bytes += payloadSize; it->second = ct+1; } @@ -179,11 +182,13 @@ void process_samples(dds::sub::DataReader &reader, s if (time_now > prev_time + std::chrono::seconds(1) && total_samples != prev_samples) { /* Output intermediate statistics */ - auto deltaTime = static_cast((time_now-prev_time).count())/1e9; //always 1e9? - std::cout << subprefix << "Received " << total_samples-prev_samples << " samples totalling " << total_bytes-prev_bytes << " bytes in " - << std::setprecision(4) << deltaTime << " seconds " << "| Rates: " << static_cast(total_samples - prev_samples) / deltaTime - << " samples/s, " << static_cast(total_bytes - prev_bytes) / (deltaTime*BYTES_PER_SEC_TO_MEGABITS_PER_SEC) << " Mbit/s, with " - << outOfOrder << " samples received out of order.\n" << std::flush; + auto deltaTime = std::chrono::duration(time_now - prev_time).count(); + printf ("=== [Subscriber] %5.3f Payload size: %lu | Total received: %llu samples, %llu bytes | Out of order: %llu samples " + "Transfer rate: %.2lf samples/s, %.2lf Mbit/s\n", + deltaTime, payloadSize, total_samples, total_bytes, outOfOrder, + (deltaTime != 0.0) ? (static_cast(total_samples - prev_samples) / deltaTime) : 0, + (deltaTime != 0.0) ? ((static_cast(total_bytes - prev_bytes) / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime) : 0); + fflush (stdout); cycles++; prev_time = time_now; prev_bytes = total_bytes; @@ -193,11 +198,12 @@ void process_samples(dds::sub::DataReader &reader, s } /* Output totals and averages */ - auto deltaTime = static_cast((std::chrono::steady_clock::now()-startTime).count())/1e9; //always 1e9? - std::cout << "\n" << subprefix << "Total received: " << total_samples << " samples, " << total_bytes << " bytes.\n"; - std::cout << subprefix << "Out of order: " << outOfOrder << " samples.\n"; - std::cout << subprefix << std::setprecision(4) << "Average transfer rate: " << static_cast(total_samples) / deltaTime << " samples/s, " - << static_cast(total_bytes) / (deltaTime*BYTES_PER_SEC_TO_MEGABITS_PER_SEC) << " Mbit/s.\n" << std::flush; + auto deltaTime = std::chrono::duration(std::chrono::steady_clock::now() - startTime).count(); + printf ("\nTotal received: %llu samples, %llu bytes\n", total_samples, total_bytes); + printf ("Out of order: %llu samples\n", outOfOrder); + printf ("Average transfer rate: %.2lf samples/s, ", static_cast(total_samples) / deltaTime); + printf ("%.2lf Mbit/s\n", (static_cast(total_bytes) / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime); + fflush (stdout); } static void sigint (int sig) @@ -272,7 +278,7 @@ int main (int argc, char **argv) reader( subscriber, topic, - dds::sub::qos::DataReaderQos(), + dds::sub::qos::DataReaderQos(tqos), pollingDelay.count() < 0 ? &listener : NULL, pollingDelay.count() < 0 ? dds::core::status::StatusMask::data_available() : dds::core::status::StatusMask::none());