Skip to content

Commit

Permalink
Strip new sample generic processing from McapHandler
Browse files Browse the repository at this point in the history
Signed-off-by: tempate <[email protected]>
  • Loading branch information
Tempate committed May 10, 2024
1 parent 66bc804 commit d46947a
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,15 @@ class BaseHandler : public ddspipe::participants::ISchemaHandler
std::unique_lock<std::mutex>& event_lock);

/**
* @brief Writes \c samples to disk.
* @brief Processes a received sample.
*
* For each sample in \c samples, it writes it to disk and removes it from \c samples.
* The method ends when \c samples is empty.
* The method is called when a new sample is received.
* It either writes the sample directly to disk, adds it to \c samples_buffer or to a pending list, or discards it.
*
* @param [in] samples List of samples to be written.
* @param [in] sample Sample to be processed.
*/
virtual void write_samples_(
std::list<const BaseMessage*>& samples) = 0;
void process_new_sample_nts_(
const BaseMessage* sample);

/**
* @brief Adds a sample to \c samples_buffer.
Expand Down Expand Up @@ -266,6 +266,17 @@ class BaseHandler : public ddspipe::participants::ISchemaHandler
void dump_pending_samples_nts_(
const std::string& type_name);

/**
* @brief Writes \c samples to disk.
*
* For each sample in \c samples, it writes it to disk and removes it from \c samples.
* The method ends when \c samples is empty.
*
* @param [in] samples List of samples to be written.
*/
virtual void write_samples_(
std::list<const BaseMessage*>& samples) = 0;

/**
* @brief Remove samples older than [now - event_window].
*
Expand All @@ -277,15 +288,18 @@ class BaseHandler : public ddspipe::participants::ISchemaHandler
void remove_outdated_samples_nts_();

/**
* @brief Create a \c DynamicType and insert it into \c dynamic_types_ .
* @brief Store a \c DynamicType and its dependencies in \c dynamic_types_.
*
* It calls \c store_dynamic_type_ with the type identifier and type object of each dependency of \c dynamic_type.
* It calls \c store_dynamic_type_ with the type identifier and type object of \c type_name.
*
* @param [in] type_name Name of the type to be stored, used as key in \c dynamic_types map.
*/
void store_dynamic_type_(
const std::string& type_name);

/**
* @brief Create a \c DynamicType and insert it into \c dynamic_types_ .
* @brief Create a \c DynamicType and insert it into \c dynamic_types_.
*
* @param [in] type_identifier Type identifier to serialize and store.
* @param [in] type_object Type object to serialize and store.
Expand Down
83 changes: 20 additions & 63 deletions ddsrecorder_participants/src/cpp/recorder/mcap/McapHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,14 @@ void McapHandler::disable()
void McapHandler::add_schema(
const fastrtps::types::DynamicType_ptr& dynamic_type)
{
std::lock_guard<std::mutex> lock(mtx_);

// NOTE: Process schemas even if in STOPPED state to avoid losing them (only sent/received once in discovery)
std::lock_guard<std::mutex> lock(mtx_);

assert(nullptr != dynamic_type);
if (dynamic_type == nullptr)
{
logWarning(DDSRECORDER_MCAP_HANDLER, "Received nullptr dynamic type. Skipping...");
return;
}

const std::string type_name = dynamic_type->get_name();

Expand Down Expand Up @@ -122,35 +125,38 @@ void McapHandler::add_schema(
}

mcap::Schema new_schema(name, encoding, data);
logInfo(DDSRECORDER_MCAP_HANDLER, "Schema created: " << new_schema.name << ".");

// Add schema to writer and to schemas map
logInfo(DDSRECORDER_MCAP_HANDLER, "\nAdding schema with name " << type_name << " :\n" << data << "\n");
// Add schema to writer
logInfo(DDSRECORDER_MCAP_HANDLER, "\nAdding schema for type " << type_name << " :\n" << data << "\n");

mcap_writer_.write(new_schema);

logInfo(DDSRECORDER_MCAP_HANDLER, "Schema created: " << new_schema.name << ".");

auto it = schemas_.find(type_name);
// Update channels previously created with blank schema
const auto it = schemas_.find(type_name);
if (it != schemas_.end())
{
// Update channels previously created with blank schema
update_channels_nts_(it->second.id, new_schema.id);
}

// Store schema
schemas_[type_name] = std::move(new_schema);

// Add type to the list of received types
received_types_.insert(type_name);

// Every time a dynamic type is added the attachment is newly calculated
// Add type to the DynamicTypesCollection
store_dynamic_type_(type_name);

if (configuration_.record_types)
{
mcap_writer_.update_dynamic_types(*Serializer::serialize(&dynamic_types_));
}

// Check if there are any pending samples for this new schema. If so, dump them.
if ((pending_samples_.find(type_name) != pending_samples_.end()) ||
// Check if there are any pending samples for this new type. If so, dump them.
if (pending_samples_.find(type_name) != pending_samples_.end() ||
(state_ == BaseHandlerStateCode::PAUSED &&
(pending_samples_paused_.find(type_name) != pending_samples_paused_.end())))
pending_samples_paused_.find(type_name) != pending_samples_paused_.end()))
{
dump_pending_samples_nts_(type_name);
}
Expand All @@ -162,16 +168,6 @@ void McapHandler::add_data(
{
std::unique_lock<std::mutex> lock(mtx_);

if (state_ == BaseHandlerStateCode::STOPPED)
{
logInfo(DDSRECORDER_MCAP_HANDLER, "Attempting to add sample through a stopped handler, dropping...");
return;
}

logInfo(
DDSRECORDER_MCAP_HANDLER,
"Adding data in topic " << topic);

// Add channel to data
mcap::ChannelId channel_id;

Expand All @@ -187,46 +183,7 @@ void McapHandler::add_data(

const auto sample = new McapMessage(data, payload_pool_, topic, channel_id, configuration_.log_publishTime);

if (received_types_.find(topic.type_name) != received_types_.end())
{
add_sample_to_buffer_nts_(sample);
return;
}

switch (state_)
{
case BaseHandlerStateCode::RUNNING:

if (configuration_.max_pending_samples != 0)
{
logInfo(
DDSRECORDER_MCAP_HANDLER,
"Schema for topic " << topic << " not yet available, inserting to pending samples queue.");

add_sample_to_pending_nts_(sample);
}
else if (!configuration_.only_with_schema)
{
// No schema available + no pending samples -> Add to buffer with blank schema
add_sample_to_buffer_nts_(sample);
}
break;

case BaseHandlerStateCode::PAUSED:

logInfo(
DDSRECORDER_MCAP_HANDLER,
"Schema for topic " << topic << " not yet available, inserting to (paused) pending samples queue.");

pending_samples_paused_[topic.type_name].push_back(sample);
break;

default:

// Should not happen, protected with mutex and state verified at beginning
utils::tsnh(utils::Formatter() << "Trying to add sample to a stopped instance.");
break;
}
process_new_sample_nts_(sample);
}

void McapHandler::write_samples_(
Expand Down
58 changes: 56 additions & 2 deletions ddsrecorder_participants/src/cpp/recorder/output/BaseHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,60 @@ void BaseHandler::stop_event_thread_nts_(
pending_samples_paused_.clear();
}

void BaseHandler::process_new_sample_nts_(
const BaseMessage* sample)
{
if (state_ == BaseHandlerStateCode::STOPPED)
{
logInfo(DDSRECORDER_BASE_HANDLER, "Attempting to add sample through a stopped handler, dropping...");
return;
}

logInfo(DDSRECORDER_BASE_HANDLER, "Adding data in topic " << sample->topic);

if (received_types_.find(sample->topic.type_name) != received_types_.end())
{
add_sample_to_buffer_nts_(sample);
return;
}

switch (state_)
{
case BaseHandlerStateCode::RUNNING:

if (configuration_.max_pending_samples != 0)
{
logInfo(DDSRECORDER_BASE_HANDLER,
"Dynamic type for topic " << sample->topic << " not yet available, inserting to pending "
"samples queue.");

add_sample_to_pending_nts_(sample);
}
else if (!configuration_.only_with_schema)
{
// No schema available + no pending samples -> Add to buffer with blank schema
add_sample_to_buffer_nts_(sample);
}
break;

case BaseHandlerStateCode::PAUSED:

logInfo(
DDSRECORDER_BASE_HANDLER,
"Dynamic type for topic " << sample->topic << " not yet available, inserting to (paused) pending "
"samples queue.");

pending_samples_paused_[sample->topic.type_name].push_back(sample);
break;

default:

// Should not happen, protected with mutex and state verified at beginning
utils::tsnh(utils::Formatter() << "Trying to add sample to a stopped instance.");
break;
}
}

void BaseHandler::add_sample_to_buffer_nts_(
const BaseMessage* sample)
{
Expand Down Expand Up @@ -556,13 +610,13 @@ void BaseHandler::store_dynamic_type_(
{
if (type_identifier == nullptr)
{
logWarning(DDSRECORDER_MCAP_HANDLER, "Attempting to store a DynamicType without a type identifier. Exiting.");
logWarning(DDSRECORDER_BASE_HANDLER, "Attempting to store a DynamicType without a type identifier. Exiting.");
return;
}

if (type_object == nullptr)
{
logWarning(DDSRECORDER_MCAP_HANDLER, "Attempting to store a DynamicType without a type object. Exiting.");
logWarning(DDSRECORDER_BASE_HANDLER, "Attempting to store a DynamicType without a type object. Exiting.");
return;
}

Expand Down

0 comments on commit d46947a

Please sign in to comment.