Skip to content

Commit

Permalink
Refactor lzma stream related functions into a nested helper class
Browse files Browse the repository at this point in the history
  • Loading branch information
Bingran Hu committed Dec 17, 2024
1 parent 81e1807 commit 09b73c7
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 127 deletions.
194 changes: 73 additions & 121 deletions components/core/src/clp/streaming_compression/lzma/Compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,134 +8,25 @@
#include <lzma.h>
#include <spdlog/spdlog.h>

#include "../../Array.hpp"
#include "../../ErrorCode.hpp"
#include "../../FileWriter.hpp"
#include "../../TraceableException.hpp"
#include "../../type_utils.hpp"
#include "Constants.hpp"

namespace {
using clp::Array;
using clp::streaming_compression::lzma::Compressor;

/**
* Attaches a pre-allocated block buffer to encoder's output stream
*
* Subsequent calls to this function resets the output buffer to its initial state.
* @param stream
* @param out_buffer
*/
auto attach_stream_output_buffer(lzma_stream* stream, Array<uint8_t>& out_buffer) -> void;

auto detach_stream_input_src(lzma_stream* stream) -> void;

auto detach_stream_output_buffer(lzma_stream* stream) -> void;

auto is_flush_action(lzma_action action) -> bool;

/**
* Initializes an LZMA compression encoder and its streams
*
* @param stream A pre-allocated `lzma_stream` object that is to be initialized
* @param compression_level
* @param dict_size Dictionary size that specifies how many bytes of the
* recently processed uncompressed data to keep in the memory
* @param check Type of integrity check calculated from the uncompressed data. LZMA_CHECK_CRC64 is
* the default in the xz command line tool. If the .xz file needs to be decompressed
* with XZ-Embedded, use LZMA_CHECK_CRC32 instead.
*/
auto init_lzma_encoder(
lzma_stream* stream,
int compression_level,
size_t dict_size,
lzma_check check = LZMA_CHECK_CRC64
) -> void;

auto attach_stream_output_buffer(lzma_stream* stream, Array<uint8_t>& out_buffer) -> void {
stream->next_out = out_buffer.data();
stream->avail_out = out_buffer.size();
}

auto detach_stream_input_src(lzma_stream* stream) -> void {
stream->next_in = nullptr;
stream->avail_in = 0;
}

auto detach_stream_output_buffer(lzma_stream* stream) -> void {
stream->next_out = nullptr;
stream->avail_out = 0;
}

auto is_flush_action(lzma_action action) -> bool {
return LZMA_SYNC_FLUSH == action || LZMA_FULL_FLUSH == action || LZMA_FULL_BARRIER == action
|| LZMA_FINISH == action;
}

auto init_lzma_encoder(
lzma_stream* stream,
int compression_level,
size_t dict_size,
lzma_check check
) -> void {
lzma_options_lzma options;
if (0 != lzma_lzma_preset(&options, compression_level)) {
SPDLOG_ERROR("Failed to initialize LZMA options' compression level.");
throw Compressor::OperationFailed(clp::ErrorCode_BadParam, __FILENAME__, __LINE__);
}
options.dict_size = dict_size;
std::array<lzma_filter, 2> filters{{
{.id = LZMA_FILTER_LZMA2, .options = &options},
{.id = LZMA_VLI_UNKNOWN, .options = nullptr},
}};

auto const rc = lzma_stream_encoder(stream, filters.data(), check);
if (LZMA_OK == rc) {
return;
}

char const* msg{nullptr};
switch (rc) {
case LZMA_MEM_ERROR:
msg = "Memory allocation failed";
break;

case LZMA_OPTIONS_ERROR:
msg = "Specified preset is not supported";
break;

case LZMA_UNSUPPORTED_CHECK:
msg = "Specified integrity check is not supported";
break;

case LZMA_PROG_ERROR:
msg = "Input arguments are not sane";
break;

default:
msg = "Unknown error";
break;
}

SPDLOG_ERROR("Error initializing the encoder: {} (error code {})", msg, static_cast<int>(rc));
throw Compressor::OperationFailed(clp::ErrorCode_BadParam, __FILENAME__, __LINE__);
}
} // namespace

namespace clp::streaming_compression::lzma {
auto Compressor::open(FileWriter& file_writer, int compression_level) -> void {
if (nullptr != m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

if (compression_level < cMinCompressionLevel || compression_level > cMaxCompressionLevel) {
throw OperationFailed(ErrorCode_Unsupported, __FILENAME__, __LINE__);
}
m_compression_level = compression_level;

m_compression_stream = LZMA_STREAM_INIT;
init_lzma_encoder(&m_compression_stream, compression_level, m_dict_size);
detach_stream_input_src(&m_compression_stream);
attach_stream_output_buffer(&m_compression_stream, m_compressed_stream_block_buffer);
m_lzma_ops.init_lzma_encoder();
m_lzma_ops.detach_input_src();
m_lzma_ops.attach_output_buffer();
m_compressed_stream_file_writer = &file_writer;
m_uncompressed_stream_pos = 0;
}
Expand All @@ -153,7 +44,7 @@ auto Compressor::close() -> void {
flush_lzma(LZMA_FINISH);
// Deallocates LZMA stream's internal data structures
lzma_end(&m_compression_stream);
detach_stream_output_buffer(&m_compression_stream);
m_lzma_ops.detach_output_buffer();
m_compressed_stream_file_writer = nullptr;
}

Expand All @@ -173,7 +64,7 @@ auto Compressor::write(char const* data, size_t data_length) -> void {
m_compression_stream.next_in = clp::size_checked_pointer_cast<uint8_t const>(data);
m_compression_stream.avail_in = data_length;
encode_lzma();
detach_stream_input_src(&m_compression_stream);
m_lzma_ops.detach_input_src();
m_uncompressed_stream_pos += data_length;
}

Expand All @@ -188,7 +79,6 @@ auto Compressor::try_get_pos(size_t& pos) const -> ErrorCode {
if (nullptr == m_compressed_stream_file_writer) {
return ErrorCode_NotInit;
}

pos = m_uncompressed_stream_pos;
return ErrorCode_Success;
}
Expand All @@ -198,7 +88,6 @@ auto Compressor::encode_lzma() -> void {
if (0 == m_compression_stream.avail_out) {
flush_stream_output_block_buffer();
}

auto const rc = lzma_code(&m_compression_stream, LZMA_RUN);
switch (rc) {
case LZMA_OK:
Expand All @@ -219,7 +108,7 @@ auto Compressor::encode_lzma() -> void {
}

auto Compressor::flush_lzma(lzma_action flush_action) -> void {
if (false == is_flush_action(flush_action)) {
if (false == LzmaStreamOperations::is_flush_action(flush_action)) {
SPDLOG_ERROR(
"lzma_code() supplied with invalid flush action - {}.",
static_cast<int>(flush_action)
Expand All @@ -232,7 +121,6 @@ auto Compressor::flush_lzma(lzma_action flush_action) -> void {
if (0 == m_compression_stream.avail_out) {
flush_stream_output_block_buffer();
}

auto const rc = lzma_code(&m_compression_stream, flush_action);
switch (rc) {
case LZMA_OK:
Expand All @@ -257,7 +145,6 @@ auto Compressor::flush_lzma(lzma_action flush_action) -> void {
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
}

flush_stream_output_block_buffer();
}

Expand All @@ -269,6 +156,71 @@ auto Compressor::flush_stream_output_block_buffer() -> void {
clp::size_checked_pointer_cast<char>(m_compressed_stream_block_buffer.data()),
cCompressedStreamBlockBufferSize - m_compression_stream.avail_out
);
attach_stream_output_buffer(&m_compression_stream, m_compressed_stream_block_buffer);
m_lzma_ops.attach_output_buffer();
}

auto Compressor::LzmaStreamOperations::is_flush_action(lzma_action action) -> bool {
return LZMA_SYNC_FLUSH == action || LZMA_FULL_FLUSH == action || LZMA_FULL_BARRIER == action
|| LZMA_FINISH == action;
}

auto Compressor::LzmaStreamOperations::attach_output_buffer() -> void {
m_p->m_compression_stream.next_out = m_p->m_compressed_stream_block_buffer.data();
m_p->m_compression_stream.avail_out = m_p->m_compressed_stream_block_buffer.size();
}

auto Compressor::LzmaStreamOperations::detach_input_src() -> void {
m_p->m_compression_stream.next_in = nullptr;
m_p->m_compression_stream.avail_in = 0;
}

auto Compressor::LzmaStreamOperations::detach_output_buffer() -> void {
m_p->m_compression_stream.next_out = nullptr;
m_p->m_compression_stream.avail_out = 0;
}

auto Compressor::LzmaStreamOperations::init_lzma_encoder(lzma_check check) -> void {
lzma_options_lzma options;
if (0 != lzma_lzma_preset(&options, m_p->m_compression_level)) {
SPDLOG_ERROR("Failed to initialize LZMA options' compression level.");
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}
options.dict_size = m_p->m_dict_size;
std::array<lzma_filter, 2> filters{{
{.id = LZMA_FILTER_LZMA2, .options = &options},
{.id = LZMA_VLI_UNKNOWN, .options = nullptr},
}};

m_p->m_compression_stream = LZMA_STREAM_INIT;
auto const rc = lzma_stream_encoder(&m_p->m_compression_stream, filters.data(), check);
if (LZMA_OK == rc) {
return;
}

char const* msg{nullptr};
switch (rc) {
case LZMA_MEM_ERROR:
msg = "Memory allocation failed";
break;

case LZMA_OPTIONS_ERROR:
msg = "Specified preset is not supported";
break;

case LZMA_UNSUPPORTED_CHECK:
msg = "Specified integrity check is not supported";
break;

case LZMA_PROG_ERROR:
msg = "Input arguments are not sane";
break;

default:
msg = "Unknown error";
break;
}

SPDLOG_ERROR("Error initializing the encoder: {} (error code {})", msg, static_cast<int>(rc));
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}
} // namespace clp::streaming_compression::lzma
56 changes: 50 additions & 6 deletions components/core/src/clp/streaming_compression/lzma/Compressor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#define CLP_STREAMING_COMPRESSION_LZMA_COMPRESSOR_HPP

#include <cstddef>
#include <memory>
#include <cstdint>

#include <lzma.h>

Expand Down Expand Up @@ -89,6 +89,48 @@ class Compressor : public ::clp::streaming_compression::Compressor {
}

private:
class LzmaStreamOperations {
public:
// Constructor
LzmaStreamOperations(Compressor* parent) : m_p(parent) {}

// Destructor
~LzmaStreamOperations() = default;

// Delete copy constructor and assignment operator
LzmaStreamOperations(LzmaStreamOperations const&) = delete;
auto operator=(LzmaStreamOperations const&) -> LzmaStreamOperations& = delete;

// Default move constructor and assignment operator
LzmaStreamOperations(LzmaStreamOperations&&) noexcept = default;
auto operator=(LzmaStreamOperations&&) noexcept -> LzmaStreamOperations& = default;

[[nodiscard]] static auto is_flush_action(lzma_action action) -> bool;

/**
* Attaches a pre-allocated block buffer to the encoder's output stream
*
* Subsequent calls to this function resets the output buffer to its initial state.
*/
auto attach_output_buffer() -> void;

auto detach_input_src() -> void;

auto detach_output_buffer() -> void;

/**
* Initializes an LZMA compression encoder and its streams
*
* @param check Type of integrity check calculated from the uncompressed data.
* LZMA_CHECK_CRC64 is the default in the xz command line tool. If the .xz file needs to be
* decompressed with XZ-Embedded, use LZMA_CHECK_CRC32 instead.
*/
auto init_lzma_encoder(lzma_check check = LZMA_CHECK_CRC64) -> void;

private:
Compressor* m_p;
};

static constexpr size_t cCompressedStreamBlockBufferSize{4096}; // 4KiB

/**
Expand Down Expand Up @@ -119,19 +161,21 @@ class Compressor : public ::clp::streaming_compression::Compressor {

/**
* Flushes the current compressed data in the output block buffer to the output file handler.
* Reset the output block buffer to receive new data.
*
* Also resets the output block buffer to receive new data.
*/
auto flush_stream_output_block_buffer() -> void;

// Variables
FileWriter* m_compressed_stream_file_writer{nullptr};

// Compressed stream variables
lzma_stream m_compression_stream;
size_t m_dict_size{cDefaultDictionarySize};

LzmaStreamOperations m_lzma_ops{this};
Array<uint8_t> m_compressed_stream_block_buffer{cCompressedStreamBlockBufferSize};

int m_compression_level{cDefaultCompressionLevel};
lzma_stream m_compression_stream = LZMA_STREAM_INIT;
// Specifies how many bytes of the recently processed uncompressed data to keep in the memory
size_t m_dict_size{cDefaultDictionarySize};
size_t m_uncompressed_stream_pos{0};
};
} // namespace clp::streaming_compression::lzma
Expand Down

0 comments on commit 09b73c7

Please sign in to comment.