diff --git a/include/dfmodules/DataStore.hpp b/include/dfmodules/DataStore.hpp index c75f1971..b78e4f8b 100644 --- a/include/dfmodules/DataStore.hpp +++ b/include/dfmodules/DataStore.hpp @@ -75,6 +75,17 @@ ERS_DECLARE_ISSUE(dfmodules, ((std::string)mod_name)((std::string)description)) /// @endcond LCOV_EXCL_STOP +/** + * @brief An ERS Issue for DataStore problems in which it is + * reasonable to skip any warning or error message. + * @cond Doxygen doesn't like ERS macros LCOV_EXCL_START + */ +ERS_DECLARE_ISSUE(dfmodules, + IgnorableDataStoreProblem, + "Module " << mod_name << ": A problem was encountered when " << description, + ((std::string)mod_name)((std::string)description)) +/// @endcond LCOV_EXCL_STOP + /** * @brief An ERS Issue for DataStore problems in which it is * not clear whether retrying the operation might succeed or not. diff --git a/plugins/HDF5DataStore.hpp b/plugins/HDF5DataStore.hpp index 389ffdec..53ceb544 100644 --- a/plugins/HDF5DataStore.hpp +++ b/plugins/HDF5DataStore.hpp @@ -257,9 +257,14 @@ class HDF5DataStore : public DataStore } // write the record - m_file_handle->write(ts); - m_recorded_size = m_file_handle->get_recorded_size(); -} + try { + m_file_handle->write(ts); + m_recorded_size = m_file_handle->get_recorded_size(); + } catch (hdf5libs::TimeSliceAlreadyExists const& excpt) { + std::string msg = "writing a time slice to file " + m_file_handle->get_file_name(); + throw IgnorableDataStoreProblem(ERS_HERE, get_name(), msg, excpt); + } + } /** * @brief Informs the HDF5DataStore that writes or reads of records diff --git a/plugins/TPStreamWriter.cpp b/plugins/TPStreamWriter.cpp index 9b81c99f..0432569e 100644 --- a/plugins/TPStreamWriter.cpp +++ b/plugins/TPStreamWriter.cpp @@ -60,9 +60,15 @@ TPStreamWriter::get_info(opmonlib::InfoCollector& ci, int /*level*/) { tpstreamwriterinfo::Info info; - info.tpset_received = m_tpset_received.exchange(0); - info.tpset_written = m_tpset_written.exchange(0); + info.heartbeat_tpsets_received = m_heartbeat_tpsets.exchange(0); + info.tpsets_with_tps_received = m_tpsets_with_tps.exchange(0); + info.tps_received = m_tps_received.exchange(0); + info.tps_written = m_tps_written.exchange(0); + info.timeslices_written = m_timeslices_written.exchange(0); info.bytes_output = m_bytes_output.exchange(0); + info.tardy_timeslice_max_seconds = m_tardy_timeslice_max_seconds.exchange(0.0); + info.total_tps_received = m_total_tps_received.load(); + info.total_tps_written = m_total_tps_written.load(); ci.add(info); } @@ -76,6 +82,8 @@ TPStreamWriter::do_conf(const data_t& payload) m_accumulation_inactivity_time_before_write = std::chrono::milliseconds(static_cast(1000*conf_params.tp_accumulation_inactivity_time_before_write_sec)); m_source_id = conf_params.source_id; + warn_user_when_tardy_tps_are_discarded = conf_params.warn_user_when_tardy_tps_are_discarded; + m_accumulation_interval_seconds = ((double) m_accumulation_interval_ticks) / 62500000.0; // create the DataStore instance here try { @@ -98,6 +106,8 @@ TPStreamWriter::do_start(const nlohmann::json& payload) TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method"; rcif::cmd::StartParams start_params = payload.get(); m_run_number = start_params.run; + m_total_tps_received.store(0); + m_total_tps_written.store(0); // 06-Mar-2022, KAB: added this call to allow DataStore to prepare for the run. // I've put this call fairly early in this method because it could throw an @@ -159,15 +169,17 @@ TPStreamWriter::do_work(std::atomic& running_flag) TPBundleHandler tp_bundle_handler(m_accumulation_interval_ticks, m_run_number, m_accumulation_inactivity_time_before_write); bool possible_pending_data = true; + size_t largest_timeslice_number = 0; while (running_flag.load() || possible_pending_data) { trigger::TPSet tpset; try { tpset = m_tpset_source->receive(m_queue_timeout); ++n_tpset_received; - ++m_tpset_received; - if (tpset.type == trigger::TPSet::Type::kHeartbeat) + if (tpset.type == trigger::TPSet::Type::kHeartbeat) { + ++m_heartbeat_tpsets; continue; + } TLOG_DEBUG(21) << "Number of TPs in TPSet is " << tpset.objects.size() << ", Source ID is " << tpset.origin << ", seqno is " << tpset.seqno << ", start timestamp is " << tpset.start_time << ", run number is " @@ -181,8 +193,12 @@ TPStreamWriter::do_work(std::atomic& running_flag) << m_run_number << "), Source ID is " << tpset.origin << ", seqno is " << tpset.seqno; continue; } + ++m_tpsets_with_tps; + size_t num_tps_in_tpset = tpset.objects.size(); tp_bundle_handler.add_tpset(std::move(tpset)); + m_tps_received += num_tps_in_tpset; + m_total_tps_received += num_tps_in_tpset; } catch (iomanager::TimeoutExpired&) { if (running_flag.load()) {continue;} } @@ -194,6 +210,13 @@ TPStreamWriter::do_work(std::atomic& running_flag) list_of_timeslices = tp_bundle_handler.get_all_remaining_timeslices(); possible_pending_data = false; } + + // keep track of the largest timeslice number (for reporting on tardy ones) + for (auto& timeslice_ptr : list_of_timeslices) { + largest_timeslice_number = std::max(timeslice_ptr->get_header().timeslice_number, largest_timeslice_number); + } + + // attempt to write out each TimeSlice for (auto& timeslice_ptr : list_of_timeslices) { daqdataformats::SourceID sid(daqdataformats::SourceID::Subsystem::kTRBuilder, m_source_id); timeslice_ptr->set_element_id(sid); @@ -205,8 +228,11 @@ TPStreamWriter::do_work(std::atomic& running_flag) should_retry = false; try { m_data_writer->write(*timeslice_ptr); - ++m_tpset_written; + ++m_timeslices_written; m_bytes_output += timeslice_ptr->get_total_size_bytes(); + size_t number_of_tps_written = (timeslice_ptr->get_sum_of_fragment_payload_sizes() / sizeof(trgdataformats::TriggerPrimitive)); + m_tps_written += number_of_tps_written; + m_total_tps_written += number_of_tps_written; } catch (const RetryableDataStoreProblem& excpt) { should_retry = true; ers::error(DataWritingProblem(ERS_HERE, @@ -219,6 +245,24 @@ TPStreamWriter::do_work(std::atomic& running_flag) } usleep(retry_wait_usec); retry_wait_usec *= 2; + } catch (const IgnorableDataStoreProblem& excpt) { + int timeslice_number_diff = largest_timeslice_number - timeslice_ptr->get_header().timeslice_number; + double seconds_too_late = m_accumulation_interval_seconds * timeslice_number_diff; + m_tardy_timeslice_max_seconds = std::max(m_tardy_timeslice_max_seconds.load(), seconds_too_late); + if (warn_user_when_tardy_tps_are_discarded) { + std::ostringstream sid_list; + bool first_frag = true; + for (auto const& frag_ptr : timeslice_ptr->get_fragments_ref()) { + if (first_frag) {first_frag = false;} + else {sid_list << ",";} + sid_list << frag_ptr->get_element_id().to_string(); + } + ers::warning(TardyTPsDiscarded(ERS_HERE, + get_name(), + sid_list.str(), + timeslice_ptr->get_header().timeslice_number, + seconds_too_late)); + } } catch (const std::exception& excpt) { ers::warning(DataWritingProblem(ERS_HERE, get_name(), diff --git a/plugins/TPStreamWriter.hpp b/plugins/TPStreamWriter.hpp index 49935f10..4d44f67a 100644 --- a/plugins/TPStreamWriter.hpp +++ b/plugins/TPStreamWriter.hpp @@ -14,8 +14,8 @@ #include "dfmodules/DataStore.hpp" #include "appfwk/DAQModule.hpp" -#include "iomanager/Receiver.hpp" #include "daqdataformats/TimeSlice.hpp" +#include "iomanager/Receiver.hpp" #include "trigger/TPSet.hpp" #include "utilities/WorkerThread.hpp" @@ -63,6 +63,8 @@ class TPStreamWriter : public dunedaq::appfwk::DAQModule std::chrono::steady_clock::duration m_accumulation_inactivity_time_before_write; daqdataformats::run_number_t m_run_number; uint32_t m_source_id; // NOLINT(build/unsigned) + bool warn_user_when_tardy_tps_are_discarded; + double m_accumulation_interval_seconds; // Queue sources and sinks using incoming_t = trigger::TPSet; @@ -73,10 +75,15 @@ class TPStreamWriter : public dunedaq::appfwk::DAQModule std::unique_ptr m_data_writer; // Metrics - std::atomic m_tpset_received = { 0 }; // NOLINT(build/unsigned) - std::atomic m_tpset_written = { 0 }; // NOLINT(build/unsigned) - std::atomic m_bytes_output = { 0 }; // NOLINT(build/unsigned) - + std::atomic m_heartbeat_tpsets = { 0 }; // NOLINT(build/unsigned) + std::atomic m_tpsets_with_tps = { 0 }; // NOLINT(build/unsigned) + std::atomic m_tps_received = { 0 }; // NOLINT(build/unsigned) + std::atomic m_tps_written = { 0 }; // NOLINT(build/unsigned) + std::atomic m_timeslices_written = { 0 }; // NOLINT(build/unsigned) + std::atomic m_bytes_output = { 0 }; // NOLINT(build/unsigned) + std::atomic m_tardy_timeslice_max_seconds = { 0.0 }; // NOLINT(build/unsigned) + std::atomic m_total_tps_received = { 0 }; // NOLINT(build/unsigned) + std::atomic m_total_tps_written = { 0 }; // NOLINT(build/unsigned) }; } // namespace dfmodules @@ -95,6 +102,14 @@ ERS_DECLARE_ISSUE_BASE(dfmodules, ((std::string)name), ((size_t)trnum)((size_t)runnum)) +ERS_DECLARE_ISSUE_BASE(dfmodules, + TardyTPsDiscarded, + appfwk::GeneralDAQModuleIssue, + "Tardy TPs from SourceIDs [" << sid_list << "] were discarded from TimeSlice number " + << trnum << " (~" << sec_too_late << " sec too late)", + ((std::string)name), + ((std::string)sid_list)((size_t)trnum)((float)sec_too_late)) + } // namespace dunedaq #endif // DFMODULES_PLUGINS_TPSTREAMWRITER_HPP_ diff --git a/schema/dfmodules/info/tpstreamwriterinfo.jsonnet b/schema/dfmodules/info/tpstreamwriterinfo.jsonnet index ca976e44..51526cae 100644 --- a/schema/dfmodules/info/tpstreamwriterinfo.jsonnet +++ b/schema/dfmodules/info/tpstreamwriterinfo.jsonnet @@ -8,10 +8,18 @@ local s = moo.oschema.schema("dunedaq.dfmodules.tpstreamwriterinfo"); local info = { uint8 : s.number("uint8", "u8", doc="An unsigned of 8 bytes"), + float4 : s.number("float4", "f4", doc="A float of 4 bytes"), + info: s.record("Info", [ - s.field("tpset_received", self.uint8, 0, doc="incremental received tpset counter"), - s.field("tpset_written", self.uint8, 0, doc="incremental written tpset counter"), - s.field("bytes_output", self.uint8, 0, doc="incremental number of bytes that have been written out"), + s.field("heartbeat_tpsets_received", self.uint8, 0, doc="incremental count of heartbeat TPSets received"), + s.field("tpsets_with_tps_received", self.uint8, 0, doc="incremental count of TPSets received that contain TPs"), + s.field("tps_received", self.uint8, 0, doc="incremental count of TPs that have been received"), + s.field("tps_written", self.uint8, 0, doc="incremental count of TPs that have been written out"), + s.field("timeslices_written", self.uint8, 0, doc="incremental count of TimeSlices that have been written out"), + s.field("bytes_output", self.uint8, 0, doc="incremental number of bytes that have been written out"), + s.field("tardy_timeslice_max_seconds", self.float4, 0, doc="incremental max amount of time that a TimeSlice was tardy"), + s.field("total_tps_received", self.uint8, 0, doc="count of TPs that have been received in the current run"), + s.field("total_tps_written", self.uint8, 0, doc="count of TPs that have been written out in the current run"), ], doc="TPSet writer information") }; diff --git a/schema/dfmodules/tpstreamwriter.jsonnet b/schema/dfmodules/tpstreamwriter.jsonnet index 8130c400..75c111cb 100644 --- a/schema/dfmodules/tpstreamwriter.jsonnet +++ b/schema/dfmodules/tpstreamwriter.jsonnet @@ -11,6 +11,8 @@ local types = { float : s.number("Float", "f4", doc="A floating point number of 4 bytes"), + flag: s.boolean("Flag", doc="Parameter that can be used to enable or disable functionality"), + conf: s.record("ConfParams", [ s.field("tp_accumulation_interval_ticks", self.size, 62500000, doc="Size of the TP accumulation window, measured in clock ticks"), @@ -19,6 +21,8 @@ local types = { s.field("data_store_parameters", self.dsparams, doc="Parameters that configure the DataStore associated with this TPStreamWriter"), s.field("source_id", self.sourceid_number, 999, doc="Source ID of TPSW instance, added to time slice header"), + s.field("warn_user_when_tardy_tps_are_discarded", self.flag, true, + doc="Whether to warn users when TimeSlices that contain tardy TPs are discarded"), ], doc="TPStreamWriter configuration parameters"), };