Skip to content

Commit

Permalink
Merge pull request #358 from DUNE-DAQ/kbiery/tpsw_ignore_late_tps
Browse files Browse the repository at this point in the history
Changes to avoid warning messages when tardy TPs arrive at the TPStreamWriter
  • Loading branch information
wesketchum authored Jul 6, 2024
2 parents a7da594 + 475f215 commit bfbdf46
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 16 deletions.
11 changes: 11 additions & 0 deletions include/dfmodules/DataStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ ERS_DECLARE_ISSUE(dfmodules,
((std::string)mod_name)((std::string)description))
/// @endcond LCOV_EXCL_STOP

/**
* @brief An ERS Issue for DataStore problems in which it is
* reasonable to skip any warning or error message.
* @cond Doxygen doesn't like ERS macros LCOV_EXCL_START
*/
ERS_DECLARE_ISSUE(dfmodules,
IgnorableDataStoreProblem,
"Module " << mod_name << ": A problem was encountered when " << description,
((std::string)mod_name)((std::string)description))
/// @endcond LCOV_EXCL_STOP

/**
* @brief An ERS Issue for DataStore problems in which it is
* not clear whether retrying the operation might succeed or not.
Expand Down
11 changes: 8 additions & 3 deletions plugins/HDF5DataStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,14 @@ class HDF5DataStore : public DataStore
}

// write the record
m_file_handle->write(ts);
m_recorded_size = m_file_handle->get_recorded_size();
}
try {
m_file_handle->write(ts);
m_recorded_size = m_file_handle->get_recorded_size();
} catch (hdf5libs::TimeSliceAlreadyExists const& excpt) {
std::string msg = "writing a time slice to file " + m_file_handle->get_file_name();
throw IgnorableDataStoreProblem(ERS_HERE, get_name(), msg, excpt);
}
}

/**
* @brief Informs the HDF5DataStore that writes or reads of records
Expand Down
54 changes: 49 additions & 5 deletions plugins/TPStreamWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ TPStreamWriter::get_info(opmonlib::InfoCollector& ci, int /*level*/)
{
tpstreamwriterinfo::Info info;

info.tpset_received = m_tpset_received.exchange(0);
info.tpset_written = m_tpset_written.exchange(0);
info.heartbeat_tpsets_received = m_heartbeat_tpsets.exchange(0);
info.tpsets_with_tps_received = m_tpsets_with_tps.exchange(0);
info.tps_received = m_tps_received.exchange(0);
info.tps_written = m_tps_written.exchange(0);
info.timeslices_written = m_timeslices_written.exchange(0);
info.bytes_output = m_bytes_output.exchange(0);
info.tardy_timeslice_max_seconds = m_tardy_timeslice_max_seconds.exchange(0.0);
info.total_tps_received = m_total_tps_received.load();
info.total_tps_written = m_total_tps_written.load();

ci.add(info);
}
Expand All @@ -76,6 +82,8 @@ TPStreamWriter::do_conf(const data_t& payload)
m_accumulation_inactivity_time_before_write =
std::chrono::milliseconds(static_cast<int>(1000*conf_params.tp_accumulation_inactivity_time_before_write_sec));
m_source_id = conf_params.source_id;
warn_user_when_tardy_tps_are_discarded = conf_params.warn_user_when_tardy_tps_are_discarded;
m_accumulation_interval_seconds = ((double) m_accumulation_interval_ticks) / 62500000.0;

// create the DataStore instance here
try {
Expand All @@ -98,6 +106,8 @@ TPStreamWriter::do_start(const nlohmann::json& payload)
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
rcif::cmd::StartParams start_params = payload.get<rcif::cmd::StartParams>();
m_run_number = start_params.run;
m_total_tps_received.store(0);
m_total_tps_written.store(0);

// 06-Mar-2022, KAB: added this call to allow DataStore to prepare for the run.
// I've put this call fairly early in this method because it could throw an
Expand Down Expand Up @@ -159,15 +169,17 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
TPBundleHandler tp_bundle_handler(m_accumulation_interval_ticks, m_run_number, m_accumulation_inactivity_time_before_write);

bool possible_pending_data = true;
size_t largest_timeslice_number = 0;
while (running_flag.load() || possible_pending_data) {
trigger::TPSet tpset;
try {
tpset = m_tpset_source->receive(m_queue_timeout);
++n_tpset_received;
++m_tpset_received;

if (tpset.type == trigger::TPSet::Type::kHeartbeat)
if (tpset.type == trigger::TPSet::Type::kHeartbeat) {
++m_heartbeat_tpsets;
continue;
}

TLOG_DEBUG(21) << "Number of TPs in TPSet is " << tpset.objects.size() << ", Source ID is " << tpset.origin
<< ", seqno is " << tpset.seqno << ", start timestamp is " << tpset.start_time << ", run number is "
Expand All @@ -181,8 +193,12 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
<< m_run_number << "), Source ID is " << tpset.origin << ", seqno is " << tpset.seqno;
continue;
}
++m_tpsets_with_tps;

size_t num_tps_in_tpset = tpset.objects.size();
tp_bundle_handler.add_tpset(std::move(tpset));
m_tps_received += num_tps_in_tpset;
m_total_tps_received += num_tps_in_tpset;
} catch (iomanager::TimeoutExpired&) {
if (running_flag.load()) {continue;}
}
Expand All @@ -194,6 +210,13 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
list_of_timeslices = tp_bundle_handler.get_all_remaining_timeslices();
possible_pending_data = false;
}

// keep track of the largest timeslice number (for reporting on tardy ones)
for (auto& timeslice_ptr : list_of_timeslices) {
largest_timeslice_number = std::max(timeslice_ptr->get_header().timeslice_number, largest_timeslice_number);
}

// attempt to write out each TimeSlice
for (auto& timeslice_ptr : list_of_timeslices) {
daqdataformats::SourceID sid(daqdataformats::SourceID::Subsystem::kTRBuilder, m_source_id);
timeslice_ptr->set_element_id(sid);
Expand All @@ -205,8 +228,11 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
should_retry = false;
try {
m_data_writer->write(*timeslice_ptr);
++m_tpset_written;
++m_timeslices_written;
m_bytes_output += timeslice_ptr->get_total_size_bytes();
size_t number_of_tps_written = (timeslice_ptr->get_sum_of_fragment_payload_sizes() / sizeof(trgdataformats::TriggerPrimitive));
m_tps_written += number_of_tps_written;
m_total_tps_written += number_of_tps_written;
} catch (const RetryableDataStoreProblem& excpt) {
should_retry = true;
ers::error(DataWritingProblem(ERS_HERE,
Expand All @@ -219,6 +245,24 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
}
usleep(retry_wait_usec);
retry_wait_usec *= 2;
} catch (const IgnorableDataStoreProblem& excpt) {
int timeslice_number_diff = largest_timeslice_number - timeslice_ptr->get_header().timeslice_number;
double seconds_too_late = m_accumulation_interval_seconds * timeslice_number_diff;
m_tardy_timeslice_max_seconds = std::max(m_tardy_timeslice_max_seconds.load(), seconds_too_late);
if (warn_user_when_tardy_tps_are_discarded) {
std::ostringstream sid_list;
bool first_frag = true;
for (auto const& frag_ptr : timeslice_ptr->get_fragments_ref()) {
if (first_frag) {first_frag = false;}
else {sid_list << ",";}
sid_list << frag_ptr->get_element_id().to_string();
}
ers::warning(TardyTPsDiscarded(ERS_HERE,
get_name(),
sid_list.str(),
timeslice_ptr->get_header().timeslice_number,
seconds_too_late));
}
} catch (const std::exception& excpt) {
ers::warning(DataWritingProblem(ERS_HERE,
get_name(),
Expand Down
25 changes: 20 additions & 5 deletions plugins/TPStreamWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
#include "dfmodules/DataStore.hpp"

#include "appfwk/DAQModule.hpp"
#include "iomanager/Receiver.hpp"
#include "daqdataformats/TimeSlice.hpp"
#include "iomanager/Receiver.hpp"
#include "trigger/TPSet.hpp"
#include "utilities/WorkerThread.hpp"

Expand Down Expand Up @@ -63,6 +63,8 @@ class TPStreamWriter : public dunedaq::appfwk::DAQModule
std::chrono::steady_clock::duration m_accumulation_inactivity_time_before_write;
daqdataformats::run_number_t m_run_number;
uint32_t m_source_id; // NOLINT(build/unsigned)
bool warn_user_when_tardy_tps_are_discarded;
double m_accumulation_interval_seconds;

// Queue sources and sinks
using incoming_t = trigger::TPSet;
Expand All @@ -73,10 +75,15 @@ class TPStreamWriter : public dunedaq::appfwk::DAQModule
std::unique_ptr<DataStore> m_data_writer;

// Metrics
std::atomic<uint64_t> m_tpset_received = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_tpset_written = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_bytes_output = { 0 }; // NOLINT(build/unsigned)

std::atomic<uint64_t> m_heartbeat_tpsets = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_tpsets_with_tps = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_tps_received = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_tps_written = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_timeslices_written = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_bytes_output = { 0 }; // NOLINT(build/unsigned)
std::atomic<double> m_tardy_timeslice_max_seconds = { 0.0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_total_tps_received = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_total_tps_written = { 0 }; // NOLINT(build/unsigned)
};
} // namespace dfmodules

Expand All @@ -95,6 +102,14 @@ ERS_DECLARE_ISSUE_BASE(dfmodules,
((std::string)name),
((size_t)trnum)((size_t)runnum))

ERS_DECLARE_ISSUE_BASE(dfmodules,
TardyTPsDiscarded,
appfwk::GeneralDAQModuleIssue,
"Tardy TPs from SourceIDs [" << sid_list << "] were discarded from TimeSlice number "
<< trnum << " (~" << sec_too_late << " sec too late)",
((std::string)name),
((std::string)sid_list)((size_t)trnum)((float)sec_too_late))

} // namespace dunedaq

#endif // DFMODULES_PLUGINS_TPSTREAMWRITER_HPP_
14 changes: 11 additions & 3 deletions schema/dfmodules/info/tpstreamwriterinfo.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@ local s = moo.oschema.schema("dunedaq.dfmodules.tpstreamwriterinfo");
local info = {
uint8 : s.number("uint8", "u8", doc="An unsigned of 8 bytes"),

float4 : s.number("float4", "f4", doc="A float of 4 bytes"),

info: s.record("Info", [
s.field("tpset_received", self.uint8, 0, doc="incremental received tpset counter"),
s.field("tpset_written", self.uint8, 0, doc="incremental written tpset counter"),
s.field("bytes_output", self.uint8, 0, doc="incremental number of bytes that have been written out"),
s.field("heartbeat_tpsets_received", self.uint8, 0, doc="incremental count of heartbeat TPSets received"),
s.field("tpsets_with_tps_received", self.uint8, 0, doc="incremental count of TPSets received that contain TPs"),
s.field("tps_received", self.uint8, 0, doc="incremental count of TPs that have been received"),
s.field("tps_written", self.uint8, 0, doc="incremental count of TPs that have been written out"),
s.field("timeslices_written", self.uint8, 0, doc="incremental count of TimeSlices that have been written out"),
s.field("bytes_output", self.uint8, 0, doc="incremental number of bytes that have been written out"),
s.field("tardy_timeslice_max_seconds", self.float4, 0, doc="incremental max amount of time that a TimeSlice was tardy"),
s.field("total_tps_received", self.uint8, 0, doc="count of TPs that have been received in the current run"),
s.field("total_tps_written", self.uint8, 0, doc="count of TPs that have been written out in the current run"),
], doc="TPSet writer information")
};

Expand Down
4 changes: 4 additions & 0 deletions schema/dfmodules/tpstreamwriter.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ local types = {

float : s.number("Float", "f4", doc="A floating point number of 4 bytes"),

flag: s.boolean("Flag", doc="Parameter that can be used to enable or disable functionality"),

conf: s.record("ConfParams", [
s.field("tp_accumulation_interval_ticks", self.size, 62500000,
doc="Size of the TP accumulation window, measured in clock ticks"),
Expand All @@ -19,6 +21,8 @@ local types = {
s.field("data_store_parameters", self.dsparams,
doc="Parameters that configure the DataStore associated with this TPStreamWriter"),
s.field("source_id", self.sourceid_number, 999, doc="Source ID of TPSW instance, added to time slice header"),
s.field("warn_user_when_tardy_tps_are_discarded", self.flag, true,
doc="Whether to warn users when TimeSlices that contain tardy TPs are discarded"),
], doc="TPStreamWriter configuration parameters"),

};
Expand Down

0 comments on commit bfbdf46

Please sign in to comment.