From 16560b589787bba76b5487cd79e466aaf278b48f Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Fri, 9 Aug 2024 16:19:30 +0200 Subject: [PATCH] Some refinements to "mop" The sinks now record triplets: latency, topic idx and source id. Sources now try to agree on a reference time, then start publishing at configurable delay from that reference time. This allows running two sources publishing at exactly the same time (if you have enough cores), or rather preventing them from publishing at the same time. It adds a run script that does a small 2x2 matrix of tests with two sources and two sinks: starting at the same time or the second delayed by 2ms, and with/without sleep 100us in between samples. It also fixes a potential race condition on appending latencies in the listener. Since the listeners in this experiment always get invoked on the same thread it didn't cause any problems. Signed-off-by: Erik Boasson --- examples/hop/mop.cpp | 260 ++++++++++++++++++++++++++++---------- examples/hop/mop_type.idl | 6 + examples/hop/runexp.bash | 29 +++++ 3 files changed, 225 insertions(+), 70 deletions(-) create mode 100644 examples/hop/runexp.bash diff --git a/examples/hop/mop.cpp b/examples/hop/mop.cpp index 6fded63a..fbe168b7 100644 --- a/examples/hop/mop.cpp +++ b/examples/hop/mop.cpp @@ -28,16 +28,23 @@ using namespace org::eclipse::cyclonedds; using namespace std::chrono_literals; +using namespace std::chrono; +using CLK = high_resolution_clock; + +enum class Type { T8, T128, T1k, T8k, T128k }; + +static Type type = Type::T128; static bool sleep_between_write = false; -static uint32_t ntopics = 1; +static uint32_t keyval = static_cast(getpid()); +static uint32_t ntopics = 10; +static uint32_t stagger = 0; // ms static std::optional pubidx; static std::optional datafile; -template -static dds::core::Time mkDDSTime (const std::chrono::time_point x) +static dds::core::Time mkDDSTime (const time_point x) { - int64_t t = std::chrono::duration_cast(x.time_since_epoch()).count(); + int64_t t = duration_cast(x.time_since_epoch()).count(); return dds::core::Time(t / 1000000000, static_cast(t % 1000000000)); } @@ -68,32 +75,74 @@ static dds::pub::DataWriter make_writer(dds::topic::Topic tp) return dds::pub::DataWriter{pub, tp, tp.qos()}; } +static void make_start_entities (dds::domain::DomainParticipant dp, dds::sub::DataReader& rd, dds::pub::DataWriter& wr) +{ + auto tpqos = dp.default_topic_qos() + << dds::core::policy::Reliability::Reliable(dds::core::Duration::infinite()) + << dds::core::policy::Durability::TransientLocal() + << dds::core::policy::History::KeepLast(1); + auto tp = dds::topic::Topic(dp, "MopSync", tpqos); + rd = make_reader(tp); + wr = make_writer(tp); +} + +static time_point get_start_time(dds::sub::DataReader rd, dds::pub::DataWriter wr) +{ + auto tstart = CLK::now() + 1s; + int64_t tstart_int64 = duration_cast(tstart.time_since_epoch()).count(); + std::cout << keyval << " proposing " << tstart.time_since_epoch().count() << std::endl; + wr << MopSync{keyval, tstart_int64}; + while (CLK::now() < tstart - 2ms) + { + auto ms = rd.take(); + for (const auto& m : ms) + { + if (!m.info().valid()) + continue; + auto prop = time_point(nanoseconds(m.data().tstart())); + if (prop > tstart) + { + tstart = prop; + std::cout << keyval << " updating to " << tstart.time_since_epoch().count() << " from " << m.data().k() << std::endl; + } + } + std::this_thread::sleep_for(1ms); + } + tstart += milliseconds(stagger); + std::cout << keyval << " starting at " << tstart.time_since_epoch().count() << std::endl; + return tstart; +} + template static void source(std::vector>& tps) { + // make entities for synchronised start first, make sure they stay around while + // we measure to avoid disturbing the measurement with the entity deletion and + // associated discovery work + dds::sub::DataReader start_rd = dds::core::null; + dds::pub::DataWriter start_wr = dds::core::null; + make_start_entities(tps[0].domain_participant(), start_rd, start_wr); std::vector> wrs; for (auto tp : tps) wrs.push_back(make_writer(tp)); signal(SIGINT, sigh); + signal(SIGTERM, sigh); T sample{}; - sample.k(static_cast(getpid())); - auto now = std::chrono::high_resolution_clock::now(); - // give forwarders and sink time to start & discovery to run - std::cout << "starting in 1s" << std::endl; - now += 1s; + sample.k(keyval); + auto now = get_start_time (start_rd, start_wr); std::this_thread::sleep_until(now); while (!interrupted) { if (pubidx.has_value()) { - wrs[pubidx.value()].write(sample, mkDDSTime(std::chrono::high_resolution_clock::now())); + wrs[pubidx.value()].write(sample, mkDDSTime(CLK::now())); } else { auto nowx = now; for (auto wr : wrs) { - wr.write(sample, mkDDSTime(std::chrono::high_resolution_clock::now())); + wr.write(sample, mkDDSTime(CLK::now())); if (sleep_between_write) { nowx += 100us; @@ -105,76 +154,89 @@ static void source(std::vector>& tps) now += 10ms; std::this_thread::sleep_until(now); } - std::cout << "wrote " << ntopics << " * " << sample.seq() << " samples" << std::endl; + std::cout << keyval << "wrote " << ntopics << " * " << sample.seq() << " samples" << std::endl; } +// t = reception time, l = latency, i = topic index, k = source key +struct TLK { int64_t t; double l; uint32_t k; }; +struct TLIK { int64_t t; double l; size_t i; uint32_t k; }; +struct LIK { double l; size_t i; uint32_t k; }; + template class Sink : public dds::sub::NoOpDataReaderListener { public: - Sink() = delete; - Sink(size_t idx, std::vector>& lats) : idx_{idx}, lats_{lats} { } + Sink() = default; + + const std::vector& lats() const { + return lats_; + }; private: void on_data_available(dds::sub::DataReader& rd) { - const auto now = std::chrono::duration_cast(std::chrono::high_resolution_clock::now().time_since_epoch()).count(); + const auto now_clk = CLK::now(); + const int64_t now = duration_cast(now_clk.time_since_epoch()).count(); auto xs = rd.take(); for (const auto& x : xs) { if (x.info().valid()) { const auto lat = now - (x.info().timestamp().sec() * 1000000000 + x.info().timestamp().nanosec()); - lats_.push_back(std::make_pair(lat / 1e3, idx_)); + lats_.push_back(TLK{now, lat / 1e3, x.data().k()}); } else { interrupted = true; } }; } - size_t idx_; - std::vector>& lats_; + std::vector lats_; }; template static void sink(std::vector>& tps) { // latencies in microseconds - std::vector> lats; + std::vector lats; // read until source disappears - // always create the "junk reader": it costs us nothing if no junk data is being published { std::vector> rds; std::vector> ls; for (size_t i = 0; i < tps.size(); i++) - ls.push_back(Sink{i, lats}); + ls.push_back(Sink{}); for (size_t i = 0; i < tps.size(); i++) - { rds.push_back(make_reader(tps[i])); + for (size_t i = 0; i < tps.size(); i++) rds[i].listener(&ls[i], dds::core::status::StatusMask::data_available()); - } while (!interrupted) - { std::this_thread::sleep_for(103ms); - } for (auto rd : rds) - { - rd.listener(); rd.close(); - } + // collect latencies for all topics and sort by reception time + std::vector tlats; + for (size_t i = 0; i < ls.size(); i++) + for (const auto& x : ls[i].lats()) + tlats.push_back(TLIK{x.t, x.l, i, x.k}); + std::sort(tlats.begin(), tlats.end(), [](const TLIK& a, const TLIK& b) -> bool { return a.t < b.t; }); + // then reduce to just latency, topic and key + for (const auto& x : tlats) + lats.push_back(LIK{x.l, x.i, x.k}); } - // destructors will have run, latencies are ours now if (datafile.has_value()) { std::ofstream f; f.open(datafile.value()); for (const auto& l : lats) - f << l.first << " " << l.second << std::endl; + f << l.l << " " << l.i << " " << l.k << std::endl; f.close(); } const size_t n = lats.size(); if (n < 2) { std::cout << "insufficient data" << std::endl; } else { - std::sort(lats.begin(), lats.end()); - std::cout << "received " << n << " samples; min " << lats[0].first << " max-1 " << lats[n-2].first << " max " << lats[n-1].first << std::endl; + std::sort(lats.begin(), lats.end(), [](const LIK& a, const LIK& b) -> bool { return a.l < b.l; }); + std::cout + << "received " << n + << " samples; min " << lats[0].l + << " max-1 " << lats[n-2].l + << " max " << lats[n-1].l << std::endl; } } @@ -201,16 +263,58 @@ static void run(const Mode mode) static void usage() { std::cout - << "usage: mop {source|sink} [OPTIONS] TYPE" << std::endl - << "OPTIONS:" << std::endl - << "-nNTPS use N topics in parallel (def = 1)" << std::endl + << "usage: mop {source|sink} [OPTIONS]" << std::endl + << std::endl + << "COMMON OPTIONS:" << std::endl + << "-tTYPE type to use one of 8, 128 (def), 1k, 8k, 128k" << std::endl + << "-nNTPS use N (def = 10) topics in parallel" << std::endl + << std::endl + << "SOURCE OPTIONS:" << std::endl + << "-kKVAL use KVAL as key value instead of process id" << std::endl << "-pIDX publish only on topic IDX" << std::endl - << "-oFILE write latencies to FILE (sink)" << std::endl + << "-sDELAY stagger: offset by DELAY ms (def = 0)" << std::endl << "-x sleep 100us between successive writes" << std::endl - << "TYPE: one of 8, 128, 1k, 8k, 128k" << std::endl; + << std::endl + << "SINK OPTIONS:" << std::endl + << "-oFILE write latencies to FILE" << std::endl; std::exit(1); } +static Type convert_typestr (const std::string& typestr) +{ + if (typestr == "8") { + return Type::T8; + } else if (typestr == "128") { + return Type::T128; + } else if (typestr == "1k") { + return Type::T1k; + } else if (typestr == "8k") { + return Type::T8k; + } else if (typestr == "128k") { + return Type::T128k; + } else { + std::cout << "invalid type, should be 8, 128, 1k, 8k, 128k" << std::endl; + std::exit(1); + return Type::T128; + } +} + +static bool handle_common_opt (int opt) +{ + switch (opt) + { + case 'n': + ntopics = static_cast(std::atoi(optarg)); + return true; + case 't': + type = convert_typestr(std::string(optarg)); + return true; + default: + // not a common option + return false; + } +} + int main (int argc, char **argv) { if (argc < 2) @@ -226,51 +330,67 @@ int main (int argc, char **argv) return 1; } + const std::string common_opt = "n:t:"; optind = 2; int opt; - while ((opt = getopt (argc, argv, "n:o:p:x")) != EOF) + switch (mode) { - switch (opt) - { - case 'n': - ntopics = static_cast(std::atoi(optarg)); - break; - case 'o': - datafile = std::string(optarg); - break; - case 'p': - pubidx = static_cast(std::atoi(optarg)); - break; - case 'x': - sleep_between_write = true; - break; - default: - usage(); - } + case Mode::Source: + while ((opt = getopt (argc, argv, (common_opt + "k:p:s:x").c_str())) != EOF) + { + if (handle_common_opt (opt)) + continue; + switch (opt) + { + case 'k': + keyval = static_cast(std::atoi(optarg)); + break; + case 'p': + pubidx = static_cast(std::atoi(optarg)); + break; + case 's': + stagger = static_cast(std::atoi(optarg)); + break; + case 'x': + sleep_between_write = true; + break; + default: + usage(); + } + } + break; + case Mode::Sink: + while ((opt = getopt (argc, argv, (common_opt + "o:").c_str())) != EOF) + { + if (handle_common_opt (opt)) + continue; + switch (opt) + { + case 'o': + datafile = std::string(optarg); + break; + default: + usage(); + } + } + break; } if (pubidx.has_value() && pubidx.value() >= ntopics) { std::cout << "topic index for publishing out of range" << std::endl; return 1; } - if (argc - optind != 1) + if (argc - optind != 0) { usage(); } - const std::string typestr = std::string(argv[optind]); - if (typestr == "8") { - run(mode); - } else if (typestr == "128") { - run(mode); - } else if (typestr == "1k") { - run(mode); - } else if (typestr == "8k") { - run(mode); - } else if (typestr == "128k") { - run(mode); - } else { - std::cout << "invalid type, should be 8, 128, 1k, 8k, 128k" << std::endl; - return 1; + switch (type) + { + case Type::T8: run(mode); break; + case Type::T128: run(mode); break; + case Type::T1k: run(mode); break; + case Type::T8k: run(mode); break; + case Type::T128k: run(mode); break; } return 0; } diff --git a/examples/hop/mop_type.idl b/examples/hop/mop_type.idl index d78a7950..220398da 100644 --- a/examples/hop/mop_type.idl +++ b/examples/hop/mop_type.idl @@ -10,6 +10,12 @@ * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause */ +@final @topic +struct MopSync { + @key uint32 k; + int64 tstart; +}; + @final @topic struct Mop8 { @key uint32 k; diff --git a/examples/hop/runexp.bash b/examples/hop/runexp.bash new file mode 100644 index 00000000..5ad38812 --- /dev/null +++ b/examples/hop/runexp.bash @@ -0,0 +1,29 @@ +# using default 10 topics, 128 bytes + +# first arg: part of filename of output file +# second arg: if empty, no stagger; if non-empty: stagger by 1ms +# remaining args: passed on to source +run () { + logname=$1 ; shift + stagger=$1 ; shift + + # writing all this junk keeps my CPU awake (macOS, M1) + bin/hop sink -j100000 0 128 & + bin/hop source -j100000 0 128 & kpids=$! + sleep 2 + for i in {0..1} ; do + bin/mop sink -o mop-$logname-$i.txt & + done + for i in {0..1} ; do + s=$((2 * i)) + bin/mop source -k$i ${stagger:+-s$s} "$@" & kpids="$kpids $!" + done + sleep 10 + kill -INT $kpids + wait +} + +run s0x0 "" # both sources start at same time, no sleep between writes +run s0x1 "" -x # both sources start at same time, 100us sleep between writes +run s1x0 "s" # source 1 starts 1ms after source 1, no sleep between writes +run s1x1 "s" -x # source 1 starts 1ms after source 1, 100us sleep between writes