diff --git a/ddsrecorder_participants/include/ddsrecorder_participants/recorder/output/BaseHandler.hpp b/ddsrecorder_participants/include/ddsrecorder_participants/recorder/output/BaseHandler.hpp index a525cae53..b034877cc 100644 --- a/ddsrecorder_participants/include/ddsrecorder_participants/recorder/output/BaseHandler.hpp +++ b/ddsrecorder_participants/include/ddsrecorder_participants/recorder/output/BaseHandler.hpp @@ -208,15 +208,15 @@ class BaseHandler : public ddspipe::participants::ISchemaHandler std::unique_lock& event_lock); /** - * @brief Writes \c samples to disk. + * @brief Processes a received sample. * - * For each sample in \c samples, it writes it to disk and removes it from \c samples. - * The method ends when \c samples is empty. + * The method is called when a new sample is received. + * It either writes the sample directly to disk, adds it to \c samples_buffer or to a pending list, or discards it. * - * @param [in] samples List of samples to be written. + * @param [in] sample Sample to be processed. */ - virtual void write_samples_( - std::list& samples) = 0; + void process_new_sample_nts_( + const BaseMessage* sample); /** * @brief Adds a sample to \c samples_buffer. @@ -266,6 +266,17 @@ class BaseHandler : public ddspipe::participants::ISchemaHandler void dump_pending_samples_nts_( const std::string& type_name); + /** + * @brief Writes \c samples to disk. + * + * For each sample in \c samples, it writes it to disk and removes it from \c samples. + * The method ends when \c samples is empty. + * + * @param [in] samples List of samples to be written. + */ + virtual void write_samples_( + std::list& samples) = 0; + /** * @brief Remove samples older than [now - event_window]. * @@ -277,7 +288,10 @@ class BaseHandler : public ddspipe::participants::ISchemaHandler void remove_outdated_samples_nts_(); /** - * @brief Create a \c DynamicType and insert it into \c dynamic_types_ . + * @brief Store a \c DynamicType and its dependencies in \c dynamic_types_. + * + * It calls \c store_dynamic_type_ with the type identifier and type object of each dependency of \c dynamic_type. + * It calls \c store_dynamic_type_ with the type identifier and type object of \c type_name. * * @param [in] type_name Name of the type to be stored, used as key in \c dynamic_types map. */ @@ -285,7 +299,7 @@ class BaseHandler : public ddspipe::participants::ISchemaHandler const std::string& type_name); /** - * @brief Create a \c DynamicType and insert it into \c dynamic_types_ . + * @brief Create a \c DynamicType and insert it into \c dynamic_types_. * * @param [in] type_identifier Type identifier to serialize and store. * @param [in] type_object Type object to serialize and store. diff --git a/ddsrecorder_participants/src/cpp/recorder/mcap/McapHandler.cpp b/ddsrecorder_participants/src/cpp/recorder/mcap/McapHandler.cpp index afb15545d..3ac6cbc80 100644 --- a/ddsrecorder_participants/src/cpp/recorder/mcap/McapHandler.cpp +++ b/ddsrecorder_participants/src/cpp/recorder/mcap/McapHandler.cpp @@ -89,11 +89,14 @@ void McapHandler::disable() void McapHandler::add_schema( const fastrtps::types::DynamicType_ptr& dynamic_type) { - std::lock_guard lock(mtx_); - // NOTE: Process schemas even if in STOPPED state to avoid losing them (only sent/received once in discovery) + std::lock_guard lock(mtx_); - assert(nullptr != dynamic_type); + if (dynamic_type == nullptr) + { + logWarning(DDSRECORDER_MCAP_HANDLER, "Received nullptr dynamic type. Skipping..."); + return; + } const std::string type_name = dynamic_type->get_name(); @@ -122,24 +125,27 @@ void McapHandler::add_schema( } mcap::Schema new_schema(name, encoding, data); + logInfo(DDSRECORDER_MCAP_HANDLER, "Schema created: " << new_schema.name << "."); - // Add schema to writer and to schemas map - logInfo(DDSRECORDER_MCAP_HANDLER, "\nAdding schema with name " << type_name << " :\n" << data << "\n"); + // Add schema to writer + logInfo(DDSRECORDER_MCAP_HANDLER, "\nAdding schema for type " << type_name << " :\n" << data << "\n"); mcap_writer_.write(new_schema); - logInfo(DDSRECORDER_MCAP_HANDLER, "Schema created: " << new_schema.name << "."); - - auto it = schemas_.find(type_name); + // Update channels previously created with blank schema + const auto it = schemas_.find(type_name); if (it != schemas_.end()) { - // Update channels previously created with blank schema update_channels_nts_(it->second.id, new_schema.id); } + + // Store schema schemas_[type_name] = std::move(new_schema); + + // Add type to the list of received types received_types_.insert(type_name); - // Every time a dynamic type is added the attachment is newly calculated + // Add type to the DynamicTypesCollection store_dynamic_type_(type_name); if (configuration_.record_types) @@ -147,10 +153,10 @@ void McapHandler::add_schema( mcap_writer_.update_dynamic_types(*Serializer::serialize(&dynamic_types_)); } - // Check if there are any pending samples for this new schema. If so, dump them. - if ((pending_samples_.find(type_name) != pending_samples_.end()) || + // Check if there are any pending samples for this new type. If so, dump them. + if (pending_samples_.find(type_name) != pending_samples_.end() || (state_ == BaseHandlerStateCode::PAUSED && - (pending_samples_paused_.find(type_name) != pending_samples_paused_.end()))) + pending_samples_paused_.find(type_name) != pending_samples_paused_.end())) { dump_pending_samples_nts_(type_name); } @@ -162,16 +168,6 @@ void McapHandler::add_data( { std::unique_lock lock(mtx_); - if (state_ == BaseHandlerStateCode::STOPPED) - { - logInfo(DDSRECORDER_MCAP_HANDLER, "Attempting to add sample through a stopped handler, dropping..."); - return; - } - - logInfo( - DDSRECORDER_MCAP_HANDLER, - "Adding data in topic " << topic); - // Add channel to data mcap::ChannelId channel_id; @@ -187,46 +183,7 @@ void McapHandler::add_data( const auto sample = new McapMessage(data, payload_pool_, topic, channel_id, configuration_.log_publishTime); - if (received_types_.find(topic.type_name) != received_types_.end()) - { - add_sample_to_buffer_nts_(sample); - return; - } - - switch (state_) - { - case BaseHandlerStateCode::RUNNING: - - if (configuration_.max_pending_samples != 0) - { - logInfo( - DDSRECORDER_MCAP_HANDLER, - "Schema for topic " << topic << " not yet available, inserting to pending samples queue."); - - add_sample_to_pending_nts_(sample); - } - else if (!configuration_.only_with_schema) - { - // No schema available + no pending samples -> Add to buffer with blank schema - add_sample_to_buffer_nts_(sample); - } - break; - - case BaseHandlerStateCode::PAUSED: - - logInfo( - DDSRECORDER_MCAP_HANDLER, - "Schema for topic " << topic << " not yet available, inserting to (paused) pending samples queue."); - - pending_samples_paused_[topic.type_name].push_back(sample); - break; - - default: - - // Should not happen, protected with mutex and state verified at beginning - utils::tsnh(utils::Formatter() << "Trying to add sample to a stopped instance."); - break; - } + process_new_sample_nts_(sample); } void McapHandler::write_samples_( diff --git a/ddsrecorder_participants/src/cpp/recorder/output/BaseHandler.cpp b/ddsrecorder_participants/src/cpp/recorder/output/BaseHandler.cpp index 6e01935f2..5b69bd384 100644 --- a/ddsrecorder_participants/src/cpp/recorder/output/BaseHandler.cpp +++ b/ddsrecorder_participants/src/cpp/recorder/output/BaseHandler.cpp @@ -375,6 +375,60 @@ void BaseHandler::stop_event_thread_nts_( pending_samples_paused_.clear(); } +void BaseHandler::process_new_sample_nts_( + const BaseMessage* sample) +{ + if (state_ == BaseHandlerStateCode::STOPPED) + { + logInfo(DDSRECORDER_BASE_HANDLER, "Attempting to add sample through a stopped handler, dropping..."); + return; + } + + logInfo(DDSRECORDER_BASE_HANDLER, "Adding data in topic " << sample->topic); + + if (received_types_.find(sample->topic.type_name) != received_types_.end()) + { + add_sample_to_buffer_nts_(sample); + return; + } + + switch (state_) + { + case BaseHandlerStateCode::RUNNING: + + if (configuration_.max_pending_samples != 0) + { + logInfo(DDSRECORDER_BASE_HANDLER, + "Dynamic type for topic " << sample->topic << " not yet available, inserting to pending " + "samples queue."); + + add_sample_to_pending_nts_(sample); + } + else if (!configuration_.only_with_schema) + { + // No schema available + no pending samples -> Add to buffer with blank schema + add_sample_to_buffer_nts_(sample); + } + break; + + case BaseHandlerStateCode::PAUSED: + + logInfo( + DDSRECORDER_BASE_HANDLER, + "Dynamic type for topic " << sample->topic << " not yet available, inserting to (paused) pending " + "samples queue."); + + pending_samples_paused_[sample->topic.type_name].push_back(sample); + break; + + default: + + // Should not happen, protected with mutex and state verified at beginning + utils::tsnh(utils::Formatter() << "Trying to add sample to a stopped instance."); + break; + } +} + void BaseHandler::add_sample_to_buffer_nts_( const BaseMessage* sample) { @@ -556,13 +610,13 @@ void BaseHandler::store_dynamic_type_( { if (type_identifier == nullptr) { - logWarning(DDSRECORDER_MCAP_HANDLER, "Attempting to store a DynamicType without a type identifier. Exiting."); + logWarning(DDSRECORDER_BASE_HANDLER, "Attempting to store a DynamicType without a type identifier. Exiting."); return; } if (type_object == nullptr) { - logWarning(DDSRECORDER_MCAP_HANDLER, "Attempting to store a DynamicType without a type object. Exiting."); + logWarning(DDSRECORDER_BASE_HANDLER, "Attempting to store a DynamicType without a type object. Exiting."); return; }