diff --git a/examples/hop/CMakeLists.txt b/examples/hop/CMakeLists.txt index 9a00ed0a..fd5b1291 100644 --- a/examples/hop/CMakeLists.txt +++ b/examples/hop/CMakeLists.txt @@ -20,5 +20,10 @@ if(NOT TARGET CycloneDDS-CXX::ddscxx) endif() idlcxx_generate(TARGET hop_type FILES hop_type.idl) +idlcxx_generate(TARGET mop_type FILES mop_type.idl) + add_executable(hop hop.cpp) target_link_libraries(hop CycloneDDS-CXX::ddscxx hop_type) + +add_executable(mop mop.cpp) +target_link_libraries(mop CycloneDDS-CXX::ddscxx mop_type) diff --git a/examples/hop/mop.cpp b/examples/hop/mop.cpp new file mode 100644 index 00000000..6fded63a --- /dev/null +++ b/examples/hop/mop.cpp @@ -0,0 +1,276 @@ +/* + * Copyright(c) 2024 ZettaScale Technology and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "dds/dds.hpp" +#include "mop_type.hpp" + +using namespace org::eclipse::cyclonedds; +using namespace std::chrono_literals; + +static bool sleep_between_write = false; +static uint32_t ntopics = 1; +static std::optional pubidx; +static std::optional datafile; + +template +static dds::core::Time mkDDSTime (const std::chrono::time_point x) +{ + int64_t t = std::chrono::duration_cast(x.time_since_epoch()).count(); + return dds::core::Time(t / 1000000000, static_cast(t % 1000000000)); +} + +static volatile std::atomic interrupted = false; +static void sigh(int sig) +{ + static_cast(sig); + interrupted = true; +} + +template +static dds::sub::DataReader make_reader(dds::topic::Topic tp) +{ + dds::domain::DomainParticipant dp = tp.domain_participant(); + std::vector spart{"P"}; + dds::sub::qos::SubscriberQos sqos = dp.default_subscriber_qos() << dds::core::policy::Partition(spart); + dds::sub::Subscriber sub{dp, sqos}; + return dds::sub::DataReader{sub, tp, tp.qos()}; +} + +template +static dds::pub::DataWriter make_writer(dds::topic::Topic tp) +{ + dds::domain::DomainParticipant dp = tp.domain_participant(); + std::vector ppart{"P"}; + dds::pub::qos::PublisherQos pqos = dp.default_publisher_qos() << dds::core::policy::Partition(ppart); + dds::pub::Publisher pub{dp, pqos}; + return dds::pub::DataWriter{pub, tp, tp.qos()}; +} + +template +static void source(std::vector>& tps) +{ + std::vector> wrs; + for (auto tp : tps) + wrs.push_back(make_writer(tp)); + signal(SIGINT, sigh); + T sample{}; + sample.k(static_cast(getpid())); + auto now = std::chrono::high_resolution_clock::now(); + // give forwarders and sink time to start & discovery to run + std::cout << "starting in 1s" << std::endl; + now += 1s; + std::this_thread::sleep_until(now); + while (!interrupted) + { + if (pubidx.has_value()) + { + wrs[pubidx.value()].write(sample, mkDDSTime(std::chrono::high_resolution_clock::now())); + } + else + { + auto nowx = now; + for (auto wr : wrs) + { + wr.write(sample, mkDDSTime(std::chrono::high_resolution_clock::now())); + if (sleep_between_write) + { + nowx += 100us; + std::this_thread::sleep_until(nowx); + } + } + } + ++sample.seq(); + now += 10ms; + std::this_thread::sleep_until(now); + } + std::cout << "wrote " << ntopics << " * " << sample.seq() << " samples" << std::endl; +} + +template +class Sink : public dds::sub::NoOpDataReaderListener { +public: + Sink() = delete; + Sink(size_t idx, std::vector>& lats) : idx_{idx}, lats_{lats} { } + +private: + void on_data_available(dds::sub::DataReader& rd) + { + const auto now = std::chrono::duration_cast(std::chrono::high_resolution_clock::now().time_since_epoch()).count(); + auto xs = rd.take(); + for (const auto& x : xs) { + if (x.info().valid()) { + const auto lat = now - (x.info().timestamp().sec() * 1000000000 + x.info().timestamp().nanosec()); + lats_.push_back(std::make_pair(lat / 1e3, idx_)); + } else { + interrupted = true; + } + }; + } + + size_t idx_; + std::vector>& lats_; +}; + +template +static void sink(std::vector>& tps) +{ + // latencies in microseconds + std::vector> lats; + // read until source disappears + // always create the "junk reader": it costs us nothing if no junk data is being published + { + std::vector> rds; + std::vector> ls; + for (size_t i = 0; i < tps.size(); i++) + ls.push_back(Sink{i, lats}); + for (size_t i = 0; i < tps.size(); i++) + { + rds.push_back(make_reader(tps[i])); + rds[i].listener(&ls[i], dds::core::status::StatusMask::data_available()); + } + while (!interrupted) + { + std::this_thread::sleep_for(103ms); + } + for (auto rd : rds) + { + rd.listener(); + rd.close(); + } + } + // destructors will have run, latencies are ours now + if (datafile.has_value()) + { + std::ofstream f; + f.open(datafile.value()); + for (const auto& l : lats) + f << l.first << " " << l.second << std::endl; + f.close(); + } + const size_t n = lats.size(); + if (n < 2) { + std::cout << "insufficient data" << std::endl; + } else { + std::sort(lats.begin(), lats.end()); + std::cout << "received " << n << " samples; min " << lats[0].first << " max-1 " << lats[n-2].first << " max " << lats[n-1].first << std::endl; + } +} + +enum class Mode { Source, Sink }; + +template +static void run(const Mode mode) +{ + dds::domain::DomainParticipant dp{0}; + auto tpqos = dp.default_topic_qos() + << dds::core::policy::Reliability::Reliable(dds::core::Duration::infinite()) + << dds::core::policy::History::KeepLast(1); + std::vector> tps; + for (uint32_t i = 0; i < ntopics; i++) + tps.push_back(dds::topic::Topic{dp, "Mop" + std::to_string(i), tpqos}); + switch (mode) + { + case Mode::Source: source(tps); break; + case Mode::Sink: sink(tps); break; + } +} + +[[noreturn]] +static void usage() +{ + std::cout + << "usage: mop {source|sink} [OPTIONS] TYPE" << std::endl + << "OPTIONS:" << std::endl + << "-nNTPS use N topics in parallel (def = 1)" << std::endl + << "-pIDX publish only on topic IDX" << std::endl + << "-oFILE write latencies to FILE (sink)" << std::endl + << "-x sleep 100us between successive writes" << std::endl + << "TYPE: one of 8, 128, 1k, 8k, 128k" << std::endl; + std::exit(1); +} + +int main (int argc, char **argv) +{ + if (argc < 2) + usage(); + const std::string modestr = std::string(argv[1]); + Mode mode; + if (modestr == "source") { + mode = Mode::Source; + } else if (modestr == "sink") { + mode = Mode::Sink; + } else { + std::cout << "invalid mode, should be source or sink" << std::endl; + return 1; + } + + optind = 2; + int opt; + while ((opt = getopt (argc, argv, "n:o:p:x")) != EOF) + { + switch (opt) + { + case 'n': + ntopics = static_cast(std::atoi(optarg)); + break; + case 'o': + datafile = std::string(optarg); + break; + case 'p': + pubidx = static_cast(std::atoi(optarg)); + break; + case 'x': + sleep_between_write = true; + break; + default: + usage(); + } + } + if (pubidx.has_value() && pubidx.value() >= ntopics) + { + std::cout << "topic index for publishing out of range" << std::endl; + return 1; + } + if (argc - optind != 1) + { + usage(); + } + const std::string typestr = std::string(argv[optind]); + if (typestr == "8") { + run(mode); + } else if (typestr == "128") { + run(mode); + } else if (typestr == "1k") { + run(mode); + } else if (typestr == "8k") { + run(mode); + } else if (typestr == "128k") { + run(mode); + } else { + std::cout << "invalid type, should be 8, 128, 1k, 8k, 128k" << std::endl; + return 1; + } + return 0; +} diff --git a/examples/hop/mop_type.idl b/examples/hop/mop_type.idl new file mode 100644 index 00000000..d78a7950 --- /dev/null +++ b/examples/hop/mop_type.idl @@ -0,0 +1,42 @@ +/* + * Copyright(c) 2024 ZettaScale Technology and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ + +@final @topic +struct Mop8 { + @key uint32 k; + uint32 seq; + //octet z[8 - 8]; +}; +@final @topic +struct Mop128 { + @key uint32 k; + uint32 seq; + octet z[128 - 8]; +}; +@final @topic +struct Mop1k { + @key uint32 k; + uint32 seq; + octet z[1024 - 8]; +}; +@final @topic +struct Mop8k { + @key uint32 k; + uint32 seq; + octet z[8*1024 - 8]; +}; +@final @topic +struct Mop128k { + @key uint32 k; + uint32 seq; + octet z[128*1024 - 8]; +};