Skip to content

Commit

Permalink
Throughput example fixes (#404)
Browse files Browse the repository at this point in the history
* CP-298

Fixed the output formatting of the throughput example programs to
match that of the CycloneDDS-C throughput examples

Signed-off-by: Martijn Reicher <[email protected]>

* CP-301

Removed output dots

Signed-off-by: Martijn Reicher <[email protected]>

* CP-302

Set batched writes on throughput example programs

Signed-off-by: Martijn Reicher <[email protected]>

* CP-300

Fixed default parameters on throughput example programs to match
those on CycloneDDS-C

Signed-off-by: Martijn Reicher <[email protected]>

* CP-299

Fixed QoS on throughput example programs

Signed-off-by: Martijn Reicher <[email protected]>

* Fix type conversion warning

Windows does not like converting from size_t to unsigned long, so
made that conversion explicit

Signed-off-by: Martijn Reicher <[email protected]>

* Fix rate calculation

Signed-off-by: Erik Boasson <[email protected]>

* Throughput example: only flush if burstInterval>0

Signed-off-by: Erik Boasson <[email protected]>

---------

Signed-off-by: Martijn Reicher <[email protected]>
Signed-off-by: Erik Boasson <[email protected]>
Co-authored-by: Erik Boasson <[email protected]>
  • Loading branch information
reicheratwork and eboasson authored Jun 9, 2023
1 parent c64812f commit 02abba3
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 31 deletions.
32 changes: 13 additions & 19 deletions examples/throughput/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ static volatile sig_atomic_t done(false); /*semaphore for keeping track of wheth

static uint32_t payloadSize(8192); /*size of the payload of the sent messages*/

static std::chrono::milliseconds burstInterval(100); /*interval between bursts of messages*/
static std::chrono::milliseconds burstInterval(0); /*interval between bursts of messages*/

static uint32_t burstSize(10); /*number of messages to send each burst*/
static uint32_t burstSize(1); /*number of messages to send each burst*/

static std::chrono::seconds timeOut(30); /*timeout before the writer will give up*/
static std::chrono::seconds timeOut(0); /*timeout before the writer will give up*/

static std::string partitionName("Throughput example"); /*name of the domain on which the throughput test is run*/

Expand Down Expand Up @@ -117,15 +117,11 @@ bool wait_for_reader(dds::pub::DataWriter<ThroughputModule::DataType> &writer)

void start_writing(
dds::pub::DataWriter<ThroughputModule::DataType> &writer,
ThroughputModule::DataType &sample,
std::chrono::milliseconds &d_int,
uint32_t b_size,
std::chrono::seconds &t_out)
ThroughputModule::DataType &sample)
{
bool timedOut = false;

auto pubStart = std::chrono::steady_clock::now();
auto reportstart = pubStart;

if (!done)
{
Expand All @@ -134,7 +130,7 @@ void start_writing(
while (!done && !timedOut)
{
auto burstStart = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < b_size; i++)
for (uint32_t i = 0; i < burstSize; i++)
{
try {
writer.write(sample);
Expand All @@ -150,17 +146,15 @@ void start_writing(
sample.count()++;
}

writer->write_flush();
std::this_thread::sleep_until(burstStart + d_int);
if (burstInterval > std::chrono::seconds(0)) {
writer->write_flush();
std::this_thread::sleep_until(burstStart + burstInterval);
}

auto n = std::chrono::steady_clock::now();
if (t_out > std::chrono::milliseconds(0) &&
n > pubStart+t_out)
{
if (timeOut > std::chrono::milliseconds(0) &&
n > pubStart+timeOut) {
timedOut = true;
} else if (reportstart + std::chrono::seconds(1) < n) {
std::cout << "." << std::flush;
reportstart = n;
}

if (writer.publication_matched_status().current_count() == 0) {
Expand Down Expand Up @@ -196,7 +190,7 @@ int main (int argc, char **argv)

dds::pub::Publisher publisher(participant, pqos);

dds::pub::qos::DataWriterQos wqos;
dds::pub::qos::DataWriterQos wqos(tqos);
wqos << dds::core::policy::WriterBatching::BatchUpdates();

dds::pub::DataWriter<ThroughputModule::DataType> writer(publisher, topic, wqos);
Expand All @@ -211,7 +205,7 @@ int main (int argc, char **argv)
signal (SIGINT, sigint);

/* Register the sample instance and write samples repeatedly or until time out */
start_writing(writer, sample, burstInterval, burstSize, timeOut);
start_writing(writer, sample);

/* Cleanup */
writer.dispose_instance(sample);
Expand Down
30 changes: 18 additions & 12 deletions examples/throughput/subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ static unsigned long long outOfOrder(0); /*keeps track of out of order samples*/

static unsigned long long total_bytes(0); /*keeps track of total bytes received*/

static unsigned long payloadSize(0); /*size of the last payload received*/

static unsigned long long total_samples(0); /*keeps track of total samples received*/

static std::chrono::milliseconds pollingDelay(-1); /*default is a listener*/
Expand Down Expand Up @@ -100,7 +102,8 @@ unsigned long long do_take(dds::sub::DataReader<ThroughputModule::DataType>& rd)
outOfOrder++;

valid_samples++;
total_bytes += s.data().payload().size();
payloadSize = static_cast<unsigned long>(s.data().payload().size());
total_bytes += payloadSize;
it->second = ct+1;
}

Expand Down Expand Up @@ -179,11 +182,13 @@ void process_samples(dds::sub::DataReader<ThroughputModule::DataType> &reader, s
if (time_now > prev_time + std::chrono::seconds(1) && total_samples != prev_samples)
{
/* Output intermediate statistics */
auto deltaTime = static_cast<double>((time_now-prev_time).count())/1e9; //always 1e9?
std::cout << subprefix << "Received " << total_samples-prev_samples << " samples totalling " << total_bytes-prev_bytes << " bytes in "
<< std::setprecision(4) << deltaTime << " seconds " << "| Rates: " << static_cast<double>(total_samples - prev_samples) / deltaTime
<< " samples/s, " << static_cast<double>(total_bytes - prev_bytes) / (deltaTime*BYTES_PER_SEC_TO_MEGABITS_PER_SEC) << " Mbit/s, with "
<< outOfOrder << " samples received out of order.\n" << std::flush;
auto deltaTime = std::chrono::duration<double>(time_now - prev_time).count();
printf ("=== [Subscriber] %5.3f Payload size: %lu | Total received: %llu samples, %llu bytes | Out of order: %llu samples "
"Transfer rate: %.2lf samples/s, %.2lf Mbit/s\n",
deltaTime, payloadSize, total_samples, total_bytes, outOfOrder,
(deltaTime != 0.0) ? (static_cast<double>(total_samples - prev_samples) / deltaTime) : 0,
(deltaTime != 0.0) ? ((static_cast<double>(total_bytes - prev_bytes) / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime) : 0);
fflush (stdout);
cycles++;
prev_time = time_now;
prev_bytes = total_bytes;
Expand All @@ -193,11 +198,12 @@ void process_samples(dds::sub::DataReader<ThroughputModule::DataType> &reader, s
}

/* Output totals and averages */
auto deltaTime = static_cast<double>((std::chrono::steady_clock::now()-startTime).count())/1e9; //always 1e9?
std::cout << "\n" << subprefix << "Total received: " << total_samples << " samples, " << total_bytes << " bytes.\n";
std::cout << subprefix << "Out of order: " << outOfOrder << " samples.\n";
std::cout << subprefix << std::setprecision(4) << "Average transfer rate: " << static_cast<double>(total_samples) / deltaTime << " samples/s, "
<< static_cast<double>(total_bytes) / (deltaTime*BYTES_PER_SEC_TO_MEGABITS_PER_SEC) << " Mbit/s.\n" << std::flush;
auto deltaTime = std::chrono::duration<double>(std::chrono::steady_clock::now() - startTime).count();
printf ("\nTotal received: %llu samples, %llu bytes\n", total_samples, total_bytes);
printf ("Out of order: %llu samples\n", outOfOrder);
printf ("Average transfer rate: %.2lf samples/s, ", static_cast<double>(total_samples) / deltaTime);
printf ("%.2lf Mbit/s\n", (static_cast<double>(total_bytes) / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime);
fflush (stdout);
}

static void sigint (int sig)
Expand Down Expand Up @@ -272,7 +278,7 @@ int main (int argc, char **argv)
reader(
subscriber,
topic,
dds::sub::qos::DataReaderQos(),
dds::sub::qos::DataReaderQos(tqos),
pollingDelay.count() < 0 ? &listener : NULL,
pollingDelay.count() < 0 ? dds::core::status::StatusMask::data_available() : dds::core::status::StatusMask::none());

Expand Down

0 comments on commit 02abba3

Please sign in to comment.