Skip to content

Commit

Permalink
PoC: record data in SQLite
Browse files Browse the repository at this point in the history
Signed-off-by: tempate <[email protected]>
  • Loading branch information
Tempate committed May 14, 2024
1 parent 2e4c08d commit 2bc8a6b
Show file tree
Hide file tree
Showing 16 changed files with 1,023 additions and 22 deletions.
98 changes: 77 additions & 21 deletions ddsrecorder/src/cpp/tool/DdsRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
#include <ddsrecorder_participants/recorder/mcap/McapHandler.hpp>
#include <ddsrecorder_participants/recorder/mcap/McapHandlerConfiguration.hpp>
#include <ddsrecorder_participants/recorder/output/BaseHandler.hpp>
#include <ddsrecorder_participants/recorder/output/OutputSettings.hpp>
#include <ddsrecorder_participants/recorder/sql/SqlHandler.hpp>
#include <ddsrecorder_participants/recorder/sql/SqlHandlerConfiguration.hpp>

#include "DdsRecorder.hpp"

Expand Down Expand Up @@ -86,7 +89,19 @@ DdsRecorder::DdsRecorder(
output_settings.prepend_timestamp = false;
}

output_settings.extension = ".mcap";
switch (configuration_.output_library)
{
case participants::OutputLibrary::mcap:
output_settings.extension = ".mcap";
break;
case participants::OutputLibrary::sql:
output_settings.extension = ".db";
break;
default:
utils::tsnh(utils::Formatter() << "The library " << configuration_.output_library << " is not valid.");
break;
}

output_settings.safety_margin = configuration_.safety_margin;
output_settings.file_rotation = configuration_.output_resource_limits_file_rotation;
output_settings.max_file_size = configuration_.output_resource_limits_max_file_size;
Expand All @@ -103,32 +118,73 @@ DdsRecorder::DdsRecorder(
output_settings.max_size = output_settings.max_file_size;
}

// Create MCAP Handler configuration
participants::McapHandlerConfiguration handler_config(
output_settings,
configuration_.max_pending_samples,
configuration_.buffer_size,
configuration_.event_window,
configuration_.cleanup_period,
configuration_.log_publish_time,
configuration_.only_with_type,
configuration_.mcap_writer_options,
configuration_.record_types,
configuration_.ros2_types);

if (file_tracker == nullptr)
{
// Create the File Tracker
file_tracker.reset(new participants::FileTracker(output_settings));
}

// Create MCAP Handler
handler_ = std::make_shared<participants::McapHandler>(
handler_config,
payload_pool_,
file_tracker,
recorder_to_handler_state_(init_state),
std::bind(&DdsRecorder::on_disk_full, this));
const auto handler_state = recorder_to_handler_state_(init_state);
const auto on_disk_full_lambda = std::bind(&DdsRecorder::on_disk_full, this);

switch (configuration_.output_library)
{
case participants::OutputLibrary::mcap:
{
// Create MCAP Handler configuration
participants::McapHandlerConfiguration handler_config(
output_settings,
configuration_.max_pending_samples,
configuration_.buffer_size,
configuration_.event_window,
configuration_.cleanup_period,
configuration_.log_publish_time,
configuration_.only_with_type,
configuration_.mcap_writer_options,
configuration_.record_types,
configuration_.ros2_types);

// Create MCAP Handler
handler_ = std::make_shared<participants::McapHandler>(
handler_config,
payload_pool_,
file_tracker,
handler_state,
on_disk_full_lambda);

break;
}
case participants::OutputLibrary::sql:
{
// Create SQL Handler configuration
participants::SqlHandlerConfiguration handler_config(
output_settings,
configuration_.max_pending_samples,
configuration_.buffer_size,
configuration_.event_window,
configuration_.cleanup_period,
configuration_.log_publish_time,
configuration_.only_with_type,
configuration_.record_types,
configuration_.ros2_types);

// Create MCAP Handler
handler_ = std::make_shared<participants::SqlHandler>(
handler_config,
payload_pool_,
file_tracker,
handler_state,
on_disk_full_lambda);

break;
}
default:
{
utils::tsnh(utils::Formatter() << "The library " << configuration_.output_library << " is not valid.");
break;
}
}


// Create DynTypes Participant
dyn_participant_ = std::make_shared<DynTypesParticipant>(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file SqlMessage.hpp
*/

#pragma once

#include <memory>

#include <ddspipe_core/efficiency/payload/PayloadPool.hpp>
#include <ddspipe_core/types/data/RtpsPayloadData.hpp>
#include <ddspipe_core/types/topic/dds/DdsTopic.hpp>

#include <ddsrecorder_participants/recorder/message/BaseMessage.hpp>


namespace eprosima {
namespace ddsrecorder {
namespace participants {

/**
* Structure extending a \c BaseMessage for SQLite.
*/
struct SqlMessage : public BaseMessage
{
SqlMessage() = default;

/**
* @brief Construct a \c SqlMessage.
*/
SqlMessage(
const ddspipe::core::types::RtpsPayloadData& payload,
std::shared_ptr<ddspipe::core::PayloadPool> payload_pool,
const ddspipe::core::types::DdsTopic& topic,
const bool log_publish_time);
};

} /* namespace participants */
} /* namespace ddsrecorder */
} /* namespace eprosima */
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@
#include <cstdint>
#include <string>

#include <cpp_utils/macros/custom_enumeration.hpp>

namespace eprosima {
namespace ddsrecorder {
namespace participants {

//! Supported output libraries
ENUMERATION_BUILDER(
OutputLibrary,
mcap, //! MCAP library.
sql //! SQL library.
);

/**
* Structure encapsulating all output configuration options.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file SqlHandler.hpp
*/

#pragma once

#include <functional>
#include <list>
#include <memory>

#include <fastrtps/types/DynamicType.h>

#include <ddspipe_core/efficiency/payload/PayloadPool.hpp>
#include <ddspipe_core/types/data/RtpsPayloadData.hpp>
#include <ddspipe_core/types/topic/dds/DdsTopic.hpp>

#include <ddsrecorder_participants/library/library_dll.h>
#include <ddsrecorder_participants/recorder/message/BaseMessage.hpp>
#include <ddsrecorder_participants/recorder/output/BaseHandler.hpp>
#include <ddsrecorder_participants/recorder/output/FileTracker.hpp>
#include <ddsrecorder_participants/recorder/sql/SqlHandlerConfiguration.hpp>
#include <ddsrecorder_participants/recorder/sql/SqlWriter.hpp>

namespace eprosima {
namespace ddsrecorder {
namespace participants {

/**
* Class that manages the interaction between DDS Pipe (\c SchemaParticipant) and MCAP files through sql library.
* Payloads are efficiently passed from DDS Pipe to sql without copying data (only references).
*
* @implements BaseHandler
*/
class SqlHandler : public BaseHandler
{
public:

/**
* SqlHandler constructor by required values.
*
* Creates SqlHandler instance with given configuration, payload pool and initial state.
* Opens temporal MCAP file where data is to be written.
*
* @throw InitializationException if creation fails (fail to open MCAP file).
*
* @warning Command methods (\c start , \c pause , \c stop , and \c trigger_event) are not thread safe
* among themselves. This is, they are expected to be executed sequentially and all in the same thread.
*
* @param config: Structure encapsulating all configuration options.
* @param payload_pool: Owner of every payload contained in received messages.
* @param init_state: Initial instance state (RUNNING/PAUSED/STOPPED).
*/
DDSRECORDER_PARTICIPANTS_DllAPI
SqlHandler(
const SqlHandlerConfiguration& config,
const std::shared_ptr<ddspipe::core::PayloadPool>& payload_pool,
std::shared_ptr<ddsrecorder::participants::FileTracker> file_tracker,
const BaseHandlerStateCode& init_state = BaseHandlerStateCode::RUNNING,
const std::function<void()>& on_disk_full_lambda = nullptr);

/**
* @brief Destructor
*
* Closes temporal MCAP file, and renames it with filename given in configuration.
* Before closing file, received dynamic types are serialized and stored as an attachment.
*
*/
DDSRECORDER_PARTICIPANTS_DllAPI
virtual ~SqlHandler();

/**
* @brief Create and store in \c schemas_ an OMG IDL (.idl format) or ROS 2 (.msg format) schema.
* Any samples following this schema that were received before the schema itself are moved to the memory buffer
* to be written with the next batch.
* Previously created channels (for this type) associated with a blank schema are updated to use the new one.
*
* @param [in] dynamic_type DynamicType containing the type information required to generate the schema.
*/
DDSRECORDER_PARTICIPANTS_DllAPI
void add_schema(
const fastrtps::types::DynamicType_ptr& dynamic_type) override;

/**
* @brief Add a data sample to the given \c topic.
*
* If a channel with (non-blank) schema exists, the sample is saved in memory \c buffer_ .
* Otherwise:
* if RUNNING -> the sample is inserted into \c pending_samples_ queue if max pending samples is not 0.
* If 0, the sample is added to buffer without schema if allowed (only_with_schema not true),
* and discarded otherwise.
* if PAUSED -> the sample is inserted into \c pending_samples_paused_ queue.
*
* If instance is STOPPED, received data is not processed.
*
* @param [in] topic DDS topic associated to this sample.
* @param [in] data SqlMessage to be added.
*/
DDSRECORDER_PARTICIPANTS_DllAPI
void add_data(
const ddspipe::core::types::DdsTopic& topic,
ddspipe::core::types::RtpsPayloadData& data) override;

protected:

/**
* @brief Writes \c samples to disk.
*
* For each sample in \c samples, it downcasts it to \c SqlMessage, writes it to disk, and removes it from
* \c samples.
* The method ends when \c samples is empty.
*
* @param [in] samples List of samples to be written.
*/
void write_samples_(
std::list<std::shared_ptr<const BaseMessage>>& samples) override;

//! SQL writer
SqlWriter sql_writer_;
};

} /* namespace participants */
} /* namespace ddsrecorder */
} /* namespace eprosima */
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file SqlHandlerConfiguration.hpp
*/

#pragma once

#include <ddsrecorder_participants/recorder/output/BaseHandlerConfiguration.hpp>
#include <ddsrecorder_participants/recorder/output/OutputSettings.hpp>

namespace eprosima {
namespace ddsrecorder {
namespace participants {

/**
* Structure encapsulating the \c SqlHandler configuration options.
*/
struct SqlHandlerConfiguration : public BaseHandlerConfiguration
{
SqlHandlerConfiguration(
const OutputSettings& output_settings,
const int max_pending_samples,
const unsigned int buffer_size,
const unsigned int event_window,
const unsigned int cleanup_period,
const bool log_publishTime,
const bool only_with_schema,
const bool record_types,
const bool ros2_types)
: BaseHandlerConfiguration(
output_settings,
max_pending_samples,
buffer_size,
event_window,
cleanup_period,
log_publishTime,
only_with_schema,
record_types,
ros2_types)
{
}
};

} /* namespace participants */
} /* namespace ddsrecorder */
} /* namespace eprosima */
Loading

0 comments on commit 2bc8a6b

Please sign in to comment.