Skip to content

Commit

Permalink
Tweaked benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
darrylmasson committed Sep 1, 2020
1 parent 0a07ac6 commit b523958
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 57 deletions.
41 changes: 19 additions & 22 deletions Options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,8 @@ void Options::UpdateDAC(std::map<int, std::map<std::string, std::vector<double>>
return;
}

void Options::SaveBenchmarks(std::map<std::string, long>& byte_counter,
std::map<int, long>& buffer_counter,
double proc_time_dp_us, double proc_time_ev_us, double proc_time_ch_us, double comp_time_us) {
void Options::SaveBenchmarks(std::map<std::string, std::map<int, long>& counters, long bytes,
std::string sid, std::map<std::string, double>& times) {
using namespace bsoncxx::builder::stream;
int level = GetInt("benchmark_level", 2);
if (level == 0) return;
Expand All @@ -393,36 +392,34 @@ void Options::SaveBenchmarks(std::map<std::string, long>& byte_counter,
run_id = std::stoi(GetString("run_identifier", "latest"));
} catch (...) {
}
std::map<int, long> bc;
std::map<std::string, std::map<int, long>> _counters;
if (level == 2) {
for (const auto& p : buffer_counter)
if (p.first != 0)
bc[int(std::ceil(std::log2(p.first)))] += p.second;
else
bc[-1] += p.second;
for (const auto& p : counters)
for (const auto& pp : p.second)
if (pp.first != 0)
_counters[p.first][int(std::floor(std::log2(pp.first)))] += pp.second;
else
_counters[p.first][-1] += pp.second;
} else if (level == 3) {
bc = buffer_counter;
_counters = counters;
}

auto search_doc = document{} << "run" << run_id << finalize;
auto update_doc = document{};
update_doc << "$set" << open_document << "run" << run_id << close_document;
update_doc << "$push" << open_document << "data" << open_document;
update_doc << "host" << fHostname;
update_doc << "bytes" << byte_counter["bytes"];
update_doc << "fragments" << byte_counter["fragments"];
update_doc << "events" << byte_counter["events"];
update_doc << "data_packets" << byte_counter["data_packets"];
update_doc << "processing_time_dp_us" << proc_time_dp_us;
update_doc << "processing_time_ev_us" << proc_time_ev_us;
update_doc << "processing_time_ch_us" << proc_time_ch_us;
update_doc << "compression_time_us" << comp_time_us;
update_doc << "id" << sid;
update_doc << "bytes" << bytes;
for (auto& p : times)
update_doc << p.first << p.second;
if (level >= 2) {
update_doc << "buffer_xfers" << open_document;
for (auto& p : bc) {
update_doc << std::to_string(p.first) << p.second;
for (auto& p : _counters) {
update_doc << p.first << open_document;
for (auto& pp : p.second)
update_doc << std::to_string(pp.first) << pp.second;
update_doc << close_document;
}
update_doc << close_document; // buffer xfers
}

update_doc << close_document; // data
Expand Down
3 changes: 2 additions & 1 deletion Options.hh
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public:
std::vector<uint16_t> GetThresholds(int);

void UpdateDAC(std::map<int, std::map<std::string, std::vector<double>>>&);
void SaveBenchmarks(std::map<std::string, long>&, std::map<int, long>&, double, double, double, double);
void SaveBenchmarks(std::map<std::string, std::map<int, long>>&, long, std::string,
std::map<std::string, double>&);

private:
int Load(std::string, mongocxx::collection&, std::string);
Expand Down
68 changes: 37 additions & 31 deletions StraxInserter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ StraxInserter::StraxInserter(){
fFragmentSize = 0;
fForceQuit = false;
fFullChunkLength = fChunkLength+fChunkOverlap;
fFragmentsProcessed = 0;
fEventsProcessed = 0;
fProcTimeDP = fProcTimeEv = fProcTimeCh = fCompTime = 0.;
}

Expand Down Expand Up @@ -74,17 +72,21 @@ StraxInserter::~StraxInserter(){
fLog->Entry(MongoLog::Message, "Still waiting for thread %lx to stop", fThreadId);
std::this_thread::sleep_for(std::chrono::seconds(2));
}
long total_dps = std::accumulate(fBufferCounter.begin(), fBufferCounter.end(), 0L,
[&](long tot, auto& p){return std::move(tot) + p.second;});
fLog->Entry(MongoLog::Local, "Thread %lx got events %.1f%% of the time",
fThreadId, (total_dps-fBufferCounter[0]+0.0)/total_dps*100.);
std::map<std::string, long> counters {
{"bytes", fBytesProcessed},
{"fragments", fFragmentsProcessed},
{"events", fEventsProcessed},
{"data_packets", total_dps - fBufferCounter[0]}};
fOptions->SaveBenchmarks(counters, fBufferCounter,
fProcTimeDP, fProcTimeEv, fProcTimeCh, fCompTime);
auto accum = [&](long tot, std::pair<int, long> it){return std::move(tot) + pair.second;};
std::stringstream ss;
ss << std::hex << fThreadId;
std::map<std::string, double> times {
{"data_packets_us", fProcTimeDP},
{"events_us", fProcTimeEv},
{"fragments_us", fProcTimeCh},
{"compression_us", fCompTime}
};
std::map<std::string, std::map<int, long>> counters {
{"fragments", fFragsPerEvent},
{"events", fEvPerDP},
{"data_packets", fBufferCounter}
};
fOptions->SaveBenchmarks(counters, fBytesProcessed, ss.str(), times);
}

int StraxInserter::Initialize(Options *options, MongoLog *log, DAQController *dataSource,
Expand Down Expand Up @@ -166,9 +168,10 @@ void StraxInserter::ProcessDatapacket(data_packet* dp){

// Take a buffer and break it up into one document per channel

u_int32_t *buff = dp->buff;
u_int32_t idx = 0;
unsigned total_words = dp->size/sizeof(u_int32_t);
uint32_t *buff = dp->buff;
uint32_t idx = 0;
unsigned total_words = dp->size/sizeof(uint32_t);
int evs_this_dp(0);
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &dp_start);
while(idx < total_words && fForceQuit == false){

Expand All @@ -177,12 +180,14 @@ void StraxInserter::ProcessDatapacket(data_packet* dp){
idx += ProcessEvent(buff+idx, total_words-idx, dp->clock_counter, dp->header_time, dp->bid);
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ev_end);
fProcTimeEv += timespec_subtract(ev_end, ev_start);
evs_this_dp++;
} else
idx++;
}
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &dp_end);
fProcTimeDP += timespec_subtract(dp_end, dp_start);
fBytesProcessed += dp->size;
fEvPerDP[evs_this_dp]++;
delete dp;
}

Expand All @@ -193,17 +198,16 @@ uint32_t StraxInserter::ProcessEvent(uint32_t* buff, unsigned total_words, long
struct timespec ch_start, ch_end;
std::map<std::string, int> fmt = fFmt[bid];

u_int32_t words_in_event = std::min(buff[0]&0xFFFFFFF, total_words);
uint32_t words_in_event = std::min(buff[0]&0xFFFFFFF, total_words);
if (words_in_event < (buff[0]&0xFFFFFFF)) {
fLog->Entry(MongoLog::Local, "Board %i garbled event header: %x/%x",
bid, buff[0]&0xFFFFFFF, total_words);
}

u_int32_t channel_mask = (buff[1]&0xFF);
uint32_t channel_mask = (buff[1]&0xFF);
if (fmt["channel_mask_msb_idx"] != -1) channel_mask |= ( ((buff[2]>>24)&0xFF)<<8);

u_int32_t event_time = buff[3]&0x7FFFFFFF;
fEventsProcessed++;
uint32_t event_time = buff[3]&0x7FFFFFFF;

if(buff[1]&0x4000000){ // board fail
const std::lock_guard<std::mutex> lg(fFC_mutex);
Expand All @@ -216,32 +220,34 @@ uint32_t StraxInserter::ProcessEvent(uint32_t* buff, unsigned total_words, long

unsigned idx = event_header_words;
int ret;
int frags(0);

for(unsigned ch=0; ch<max_channels; ch++){
if (channel_mask & (1<<ch)) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ch_start);
ret = ProcessChannel(buff+idx, words_in_event, bid, ch, header_time, event_time,
clock_counter, channel_mask);
clock_counter, channel_mask, frags);
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ch_end);
fProcTimeCh += timespec_subtract(ch_end, ch_start);
if (ret == -1)
break;
idx += ret;
}
}
fFragsPerEvent[frags]++;
return idx;
}

int StraxInserter::ProcessChannel(uint32_t* buff, unsigned words_in_event, int bid, int channel,
uint32_t header_time, uint32_t event_time, long clock_counter, int channel_mask) {
uint32_t header_time, uint32_t event_time, long clock_counter, int channel_mask, int& frags) {
// buff points to the first word of the channel's data

// These defaults are valid for 'default' firmware where all channels are the same size
int channels_in_event = std::bitset<max_channels>(channel_mask).count();
u_int32_t channel_words = (words_in_event-event_header_words) / channels_in_event;
uint32_t channel_words = (words_in_event-event_header_words) / channels_in_event;
long channel_time = event_time;
long channel_timeMSB = clock_counter<<31;
u_int16_t baseline_ch = 0;
uint16_t baseline_ch = 0;
std::map<std::string, int> fmt = fFmt[bid];

// Presence of a channel header indicates non-default firmware (DPP-DAW) so override
Expand Down Expand Up @@ -294,27 +300,27 @@ int StraxInserter::ProcessChannel(uint32_t* buff, unsigned words_in_event, int b
}
}

u_int16_t *payload = reinterpret_cast<u_int16_t*>(buff+fmt["channel_header_words"]);
u_int32_t samples_in_pulse = (channel_words-fmt["channel_header_words"])<<1;
u_int16_t sw = fmt["ns_per_sample"];
uint16_t *payload = reinterpret_cast<uint16_t*>(buff+fmt["channel_header_words"]);
uint32_t samples_in_pulse = (channel_words-fmt["channel_header_words"])<<1;
uint16_t sw = fmt["ns_per_sample"];
int samples_per_fragment = fFragmentBytes>>1;
int16_t cl = fOptions->GetChannel(bid, channel);
// Failing to discern which channel we're getting data from seems serious enough to throw
if(cl==-1)
throw std::runtime_error("Failed to parse channel map. I'm gonna just kms now.");

int num_frags = std::ceil(1.*samples_in_pulse/samples_per_fragment);
frags += num_frags;
for (uint16_t frag_i = 0; frag_i < num_frags; frag_i++) {
std::string fragment;
fragment.reserve(fFragmentBytes + fStraxHeaderSize);

// How long is this fragment?
u_int32_t samples_this_fragment = samples_per_fragment;
uint32_t samples_this_fragment = samples_per_fragment;
if (frag_i == num_frags-1)
samples_this_fragment = samples_in_pulse - frag_i*samples_per_fragment;
fFragmentsProcessed++;

u_int64_t time_this_fragment = Time64 + samples_per_fragment*sw*frag_i;
int64_t time_this_fragment = Time64 + samples_per_fragment*sw*frag_i;
fragment.append((char*)&time_this_fragment, sizeof(time_this_fragment));
fragment.append((char*)&samples_this_fragment, sizeof(samples_this_fragment));
fragment.append((char*)&sw, sizeof(sw));
Expand All @@ -327,7 +333,7 @@ int StraxInserter::ProcessChannel(uint32_t* buff, unsigned words_in_event, int b
fragment.append((char*)(payload + frag_i*samples_per_fragment), samples_this_fragment*2);
uint16_t zero_filler = 0;
while((int)fragment.size()<fFragmentBytes+fStraxHeaderSize)
fragment.append((char*)&zero_filler, 2);
fragment.append((char*)&zero_filler, sizeof(zero_filler));

AddFragmentToBuffer(fragment, time_this_fragment, event_time, clock_counter);
} // loop over frag_i
Expand Down
6 changes: 3 additions & 3 deletions StraxInserter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public:
private:
void ProcessDatapacket(data_packet *dp);
uint32_t ProcessEvent(uint32_t*, unsigned, long, uint32_t, int);
int ProcessChannel(uint32_t*, unsigned, int, int, uint32_t, uint32_t, long, int);
int ProcessChannel(uint32_t*, unsigned, int, int, uint32_t, uint32_t, long, int, int&);
void WriteOutFiles(bool end=false);
void GenerateArtificialDeadtime(int64_t, int16_t, uint32_t, int);
void AddFragmentToBuffer(std::string&, int64_t, uint32_t, int);
Expand Down Expand Up @@ -88,10 +88,10 @@ private:
std::map<int, std::atomic_int> fDataPerChan;
std::mutex fDPC_mutex;
std::map<int, long> fBufferCounter;
std::map<int, long> fFragsPerEvent;
std::map<int, long> fEvPerDP;
std::atomic_int fBufferLength;
long fBytesProcessed;
long fFragmentsProcessed;
long fEventsProcessed;

double fProcTimeDP, fProcTimeEv, fProcTimeCh, fCompTime;
std::thread::id fThreadId;
Expand Down

0 comments on commit b523958

Please sign in to comment.