From b52395879cd1a245f8072b9879b883cbbf5538f2 Mon Sep 17 00:00:00 2001 From: Darryl Masson Date: Tue, 1 Sep 2020 15:53:01 +0200 Subject: [PATCH] Tweaked benchmarks --- Options.cc | 41 ++++++++++++++--------------- Options.hh | 3 ++- StraxInserter.cc | 68 ++++++++++++++++++++++++++---------------------- StraxInserter.hh | 6 ++--- 4 files changed, 61 insertions(+), 57 deletions(-) diff --git a/Options.cc b/Options.cc index 84d36a67..a60aca23 100644 --- a/Options.cc +++ b/Options.cc @@ -382,9 +382,8 @@ void Options::UpdateDAC(std::map> return; } -void Options::SaveBenchmarks(std::map& byte_counter, - std::map& buffer_counter, - double proc_time_dp_us, double proc_time_ev_us, double proc_time_ch_us, double comp_time_us) { +void Options::SaveBenchmarks(std::map& counters, long bytes, + std::string sid, std::map& times) { using namespace bsoncxx::builder::stream; int level = GetInt("benchmark_level", 2); if (level == 0) return; @@ -393,15 +392,16 @@ void Options::SaveBenchmarks(std::map& byte_counter, run_id = std::stoi(GetString("run_identifier", "latest")); } catch (...) { } - std::map bc; + std::map> _counters; if (level == 2) { - for (const auto& p : buffer_counter) - if (p.first != 0) - bc[int(std::ceil(std::log2(p.first)))] += p.second; - else - bc[-1] += p.second; + for (const auto& p : counters) + for (const auto& pp : p.second) + if (pp.first != 0) + _counters[p.first][int(std::floor(std::log2(pp.first)))] += pp.second; + else + _counters[p.first][-1] += pp.second; } else if (level == 3) { - bc = buffer_counter; + _counters = counters; } auto search_doc = document{} << "run" << run_id << finalize; @@ -409,20 +409,17 @@ void Options::SaveBenchmarks(std::map& byte_counter, update_doc << "$set" << open_document << "run" << run_id << close_document; update_doc << "$push" << open_document << "data" << open_document; update_doc << "host" << fHostname; - update_doc << "bytes" << byte_counter["bytes"]; - update_doc << "fragments" << byte_counter["fragments"]; - update_doc << "events" << byte_counter["events"]; - update_doc << "data_packets" << byte_counter["data_packets"]; - update_doc << "processing_time_dp_us" << proc_time_dp_us; - update_doc << "processing_time_ev_us" << proc_time_ev_us; - update_doc << "processing_time_ch_us" << proc_time_ch_us; - update_doc << "compression_time_us" << comp_time_us; + update_doc << "id" << sid; + update_doc << "bytes" << bytes; + for (auto& p : times) + update_doc << p.first << p.second; if (level >= 2) { - update_doc << "buffer_xfers" << open_document; - for (auto& p : bc) { - update_doc << std::to_string(p.first) << p.second; + for (auto& p : _counters) { + update_doc << p.first << open_document; + for (auto& pp : p.second) + update_doc << std::to_string(pp.first) << pp.second; + update_doc << close_document; } - update_doc << close_document; // buffer xfers } update_doc << close_document; // data diff --git a/Options.hh b/Options.hh index df8f97f5..d5204295 100644 --- a/Options.hh +++ b/Options.hh @@ -78,7 +78,8 @@ public: std::vector GetThresholds(int); void UpdateDAC(std::map>>&); - void SaveBenchmarks(std::map&, std::map&, double, double, double, double); + void SaveBenchmarks(std::map>&, long, std::string, + std::map&); private: int Load(std::string, mongocxx::collection&, std::string); diff --git a/StraxInserter.cc b/StraxInserter.cc index 8eeaa737..80aae00b 100644 --- a/StraxInserter.cc +++ b/StraxInserter.cc @@ -41,8 +41,6 @@ StraxInserter::StraxInserter(){ fFragmentSize = 0; fForceQuit = false; fFullChunkLength = fChunkLength+fChunkOverlap; - fFragmentsProcessed = 0; - fEventsProcessed = 0; fProcTimeDP = fProcTimeEv = fProcTimeCh = fCompTime = 0.; } @@ -74,17 +72,21 @@ StraxInserter::~StraxInserter(){ fLog->Entry(MongoLog::Message, "Still waiting for thread %lx to stop", fThreadId); std::this_thread::sleep_for(std::chrono::seconds(2)); } - long total_dps = std::accumulate(fBufferCounter.begin(), fBufferCounter.end(), 0L, - [&](long tot, auto& p){return std::move(tot) + p.second;}); - fLog->Entry(MongoLog::Local, "Thread %lx got events %.1f%% of the time", - fThreadId, (total_dps-fBufferCounter[0]+0.0)/total_dps*100.); - std::map counters { - {"bytes", fBytesProcessed}, - {"fragments", fFragmentsProcessed}, - {"events", fEventsProcessed}, - {"data_packets", total_dps - fBufferCounter[0]}}; - fOptions->SaveBenchmarks(counters, fBufferCounter, - fProcTimeDP, fProcTimeEv, fProcTimeCh, fCompTime); + auto accum = [&](long tot, std::pair it){return std::move(tot) + pair.second;}; + std::stringstream ss; + ss << std::hex << fThreadId; + std::map times { + {"data_packets_us", fProcTimeDP}, + {"events_us", fProcTimeEv}, + {"fragments_us", fProcTimeCh}, + {"compression_us", fCompTime} + }; + std::map> counters { + {"fragments", fFragsPerEvent}, + {"events", fEvPerDP}, + {"data_packets", fBufferCounter} + }; + fOptions->SaveBenchmarks(counters, fBytesProcessed, ss.str(), times); } int StraxInserter::Initialize(Options *options, MongoLog *log, DAQController *dataSource, @@ -166,9 +168,10 @@ void StraxInserter::ProcessDatapacket(data_packet* dp){ // Take a buffer and break it up into one document per channel - u_int32_t *buff = dp->buff; - u_int32_t idx = 0; - unsigned total_words = dp->size/sizeof(u_int32_t); + uint32_t *buff = dp->buff; + uint32_t idx = 0; + unsigned total_words = dp->size/sizeof(uint32_t); + int evs_this_dp(0); clock_gettime(CLOCK_THREAD_CPUTIME_ID, &dp_start); while(idx < total_words && fForceQuit == false){ @@ -177,12 +180,14 @@ void StraxInserter::ProcessDatapacket(data_packet* dp){ idx += ProcessEvent(buff+idx, total_words-idx, dp->clock_counter, dp->header_time, dp->bid); clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ev_end); fProcTimeEv += timespec_subtract(ev_end, ev_start); + evs_this_dp++; } else idx++; } clock_gettime(CLOCK_THREAD_CPUTIME_ID, &dp_end); fProcTimeDP += timespec_subtract(dp_end, dp_start); fBytesProcessed += dp->size; + fEvPerDP[evs_this_dp]++; delete dp; } @@ -193,17 +198,16 @@ uint32_t StraxInserter::ProcessEvent(uint32_t* buff, unsigned total_words, long struct timespec ch_start, ch_end; std::map fmt = fFmt[bid]; - u_int32_t words_in_event = std::min(buff[0]&0xFFFFFFF, total_words); + uint32_t words_in_event = std::min(buff[0]&0xFFFFFFF, total_words); if (words_in_event < (buff[0]&0xFFFFFFF)) { fLog->Entry(MongoLog::Local, "Board %i garbled event header: %x/%x", bid, buff[0]&0xFFFFFFF, total_words); } - u_int32_t channel_mask = (buff[1]&0xFF); + uint32_t channel_mask = (buff[1]&0xFF); if (fmt["channel_mask_msb_idx"] != -1) channel_mask |= ( ((buff[2]>>24)&0xFF)<<8); - u_int32_t event_time = buff[3]&0x7FFFFFFF; - fEventsProcessed++; + uint32_t event_time = buff[3]&0x7FFFFFFF; if(buff[1]&0x4000000){ // board fail const std::lock_guard lg(fFC_mutex); @@ -216,12 +220,13 @@ uint32_t StraxInserter::ProcessEvent(uint32_t* buff, unsigned total_words, long unsigned idx = event_header_words; int ret; + int frags(0); for(unsigned ch=0; ch(channel_mask).count(); - u_int32_t channel_words = (words_in_event-event_header_words) / channels_in_event; + uint32_t channel_words = (words_in_event-event_header_words) / channels_in_event; long channel_time = event_time; long channel_timeMSB = clock_counter<<31; - u_int16_t baseline_ch = 0; + uint16_t baseline_ch = 0; std::map fmt = fFmt[bid]; // Presence of a channel header indicates non-default firmware (DPP-DAW) so override @@ -294,9 +300,9 @@ int StraxInserter::ProcessChannel(uint32_t* buff, unsigned words_in_event, int b } } - u_int16_t *payload = reinterpret_cast(buff+fmt["channel_header_words"]); - u_int32_t samples_in_pulse = (channel_words-fmt["channel_header_words"])<<1; - u_int16_t sw = fmt["ns_per_sample"]; + uint16_t *payload = reinterpret_cast(buff+fmt["channel_header_words"]); + uint32_t samples_in_pulse = (channel_words-fmt["channel_header_words"])<<1; + uint16_t sw = fmt["ns_per_sample"]; int samples_per_fragment = fFragmentBytes>>1; int16_t cl = fOptions->GetChannel(bid, channel); // Failing to discern which channel we're getting data from seems serious enough to throw @@ -304,17 +310,17 @@ int StraxInserter::ProcessChannel(uint32_t* buff, unsigned words_in_event, int b throw std::runtime_error("Failed to parse channel map. I'm gonna just kms now."); int num_frags = std::ceil(1.*samples_in_pulse/samples_per_fragment); + frags += num_frags; for (uint16_t frag_i = 0; frag_i < num_frags; frag_i++) { std::string fragment; fragment.reserve(fFragmentBytes + fStraxHeaderSize); // How long is this fragment? - u_int32_t samples_this_fragment = samples_per_fragment; + uint32_t samples_this_fragment = samples_per_fragment; if (frag_i == num_frags-1) samples_this_fragment = samples_in_pulse - frag_i*samples_per_fragment; - fFragmentsProcessed++; - u_int64_t time_this_fragment = Time64 + samples_per_fragment*sw*frag_i; + int64_t time_this_fragment = Time64 + samples_per_fragment*sw*frag_i; fragment.append((char*)&time_this_fragment, sizeof(time_this_fragment)); fragment.append((char*)&samples_this_fragment, sizeof(samples_this_fragment)); fragment.append((char*)&sw, sizeof(sw)); @@ -327,7 +333,7 @@ int StraxInserter::ProcessChannel(uint32_t* buff, unsigned words_in_event, int b fragment.append((char*)(payload + frag_i*samples_per_fragment), samples_this_fragment*2); uint16_t zero_filler = 0; while((int)fragment.size() fDataPerChan; std::mutex fDPC_mutex; std::map fBufferCounter; + std::map fFragsPerEvent; + std::map fEvPerDP; std::atomic_int fBufferLength; long fBytesProcessed; - long fFragmentsProcessed; - long fEventsProcessed; double fProcTimeDP, fProcTimeEv, fProcTimeCh, fCompTime; std::thread::id fThreadId;