Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core-clp): Add LZMA Compressor implementation and LZMA dependency. #614

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
dceb564
Add lzma download and port lzma scripts
Nov 25, 2024
d5af274
Make unit test pass
Nov 25, 2024
b94ca26
Refactor lzma compressor to group common functionalities into helplers
Nov 26, 2024
707c412
Improve comments
Nov 27, 2024
6d1ab8f
Fix reference link
Nov 27, 2024
89b5707
Add install for CentOS
Nov 27, 2024
c646cea
Apply coderabbit suggestions
Nov 27, 2024
c91e5fb
Remove decompressor related files
Nov 27, 2024
26b0663
Address review concerns
Nov 30, 2024
740bc1c
Address review concern
Dec 2, 2024
e2be883
Simplify else-if
Dec 2, 2024
905367d
Fix else-if
Dec 2, 2024
8ae88b2
Add lzma (xz) dep to MacOS
Dec 2, 2024
0d0c20e
Refactor helper run_lzma()
Dec 2, 2024
559485d
Update function doc
Dec 2, 2024
7c69c69
Clarify unit test early termination
Dec 2, 2024
a6d68b8
Update components/core/tests/test-StreamingCompression.cpp
Bill-hbrhbr Dec 2, 2024
1519c21
Split LZMA_RUN from flush actions
Dec 3, 2024
655bb46
Refactor unit test
Dec 3, 2024
4fb6c01
Update components/core/src/clp/streaming_compression/lzma/Compressor.cpp
Bill-hbrhbr Dec 3, 2024
a8799b5
Merge edits
Dec 3, 2024
2b85f01
Fix import
Dec 3, 2024
eda7d6c
Apply suggestions from code review
Bill-hbrhbr Dec 4, 2024
4164a9d
Address review concern
Dec 4, 2024
8ab0653
Add a comment
Dec 5, 2024
c436f21
Apply suggestions from code review
Bill-hbrhbr Dec 6, 2024
7bd34d2
Update comment to 100-char length
Dec 6, 2024
efd2b27
Fix according to coding style guidelines
Dec 11, 2024
c530f92
Apply suggestions from code review
Bill-hbrhbr Dec 12, 2024
e751ee6
Update CMakeLists.txt
Dec 12, 2024
1c5efcd
Address review concern
Dec 12, 2024
856c7cb
Update TODO
Dec 12, 2024
43e22d2
Case fix
Dec 12, 2024
829a6b2
Remove unnecessary function inline comments
Dec 12, 2024
81e1807
Improve comment
Dec 12, 2024
09b73c7
Refactor lzma stream related functions into a nested helper class
Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/core/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ IncludeCategories:
# NOTE: A header is grouped by first matching regex
# Library headers. Update when adding new libraries.
# NOTE: clang-format retains leading white-space on a line in violation of the YAML spec.
- Regex: "<(absl|antlr4|archive|boost|bsoncxx|catch2|curl|date|fmt|json|log_surgeon|mongocxx\
- Regex: "<(absl|antlr4|archive|boost|bsoncxx|catch2|curl|date|fmt|json|log_surgeon|lzma|mongocxx\
|msgpack|mysql|openssl|outcome|regex_utils|simdjson|spdlog|sqlite3|string_utils|yaml-cpp|zstd)"
Priority: 3
# C system headers
Expand Down
26 changes: 24 additions & 2 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

# Set general compressor
set(GENERAL_COMPRESSOR "zstd" CACHE STRING "The general-purpose compressor used as the 2nd-stage compressor")
set_property(CACHE GENERAL_COMPRESSOR PROPERTY STRINGS passthrough zstd)
if ("${GENERAL_COMPRESSOR}" STREQUAL "passthrough")
set_property(CACHE GENERAL_COMPRESSOR PROPERTY STRINGS lzma passthrough zstd)
if ("${GENERAL_COMPRESSOR}" STREQUAL "lzma")
add_definitions(-DUSE_LZMA_COMPRESSION=1)
message(STATUS "Using Lempel–Ziv–Markov chain Algorithm compression")
elseif ("${GENERAL_COMPRESSOR}" STREQUAL "passthrough")
add_definitions(-DUSE_PASSTHROUGH_COMPRESSION=1)
message(STATUS "Using passthrough compression")
elseif ("${GENERAL_COMPRESSOR}" STREQUAL "zstd")
Expand Down Expand Up @@ -224,6 +227,21 @@ else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for ZStd")
endif()

# Find and setup LZMA Library
# TODO: Add a script in ./cmake/Modules to properly import LZMA in find_package()'s module mode
if(CLP_USE_STATIC_LIBS)
set(LIBLZMA_USE_STATIC_LIBS ON)
endif()
find_package(LibLZMA REQUIRED)
if(LIBLZMA_FOUND)
message(STATUS "Found Lzma ${LIBLZMA_VERSION_STRING}")
message(STATUS "Lzma library location: ${LIBLZMA_LIBRARIES}")
message(STATUS "Lzma Include Dir: ${LIBLZMA_INCLUDE_DIRS}")
else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for Lzma")
endif()
include_directories(${LIBLZMA_INCLUDE_DIRS})

# sqlite dependencies
set(sqlite_DYNAMIC_LIBS "dl;m;pthread")
include(cmake/Modules/FindLibraryDependencies.cmake)
Expand Down Expand Up @@ -462,6 +480,9 @@ set(SOURCE_FILES_unitTest
src/clp/streaming_compression/Compressor.hpp
src/clp/streaming_compression/Constants.hpp
src/clp/streaming_compression/Decompressor.hpp
src/clp/streaming_compression/lzma/Compressor.cpp
src/clp/streaming_compression/lzma/Compressor.hpp
src/clp/streaming_compression/lzma/Constants.hpp
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
src/clp/streaming_compression/passthrough/Compressor.cpp
src/clp/streaming_compression/passthrough/Compressor.hpp
src/clp/streaming_compression/passthrough/Decompressor.cpp
Expand Down Expand Up @@ -549,6 +570,7 @@ target_link_libraries(unitTest
clp::regex_utils
clp::string_utils
yaml-cpp::yaml-cpp
${LIBLZMA_LIBRARIES}
ZStd::ZStd
)
target_compile_features(unitTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
namespace clp::streaming_compression {
enum class CompressorType : uint8_t {
ZSTD = 0x10,
LZMA = 0x20,
Passthrough = 0xFF,
};
} // namespace clp::streaming_compression
Expand Down
274 changes: 274 additions & 0 deletions components/core/src/clp/streaming_compression/lzma/Compressor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
#include "Compressor.hpp"

#include <array>
#include <cstddef>
#include <cstdint>
#include <cstring>

#include <lzma.h>
#include <spdlog/spdlog.h>

#include "../../Array.hpp"
#include "../../ErrorCode.hpp"
#include "../../FileWriter.hpp"
#include "../../TraceableException.hpp"
#include "../../type_utils.hpp"
#include "Constants.hpp"

namespace {
using clp::Array;
using clp::streaming_compression::lzma::Compressor;

/**
* Attaches a pre-allocated block buffer to encoder's output stream
*
* Subsequent calls to this function resets the output buffer to its initial state.
* @param stream
* @param out_buffer
*/
auto attach_stream_output_buffer(lzma_stream* stream, Array<uint8_t>& out_buffer) -> void;

auto detach_stream_input_src(lzma_stream* stream) -> void;

auto detach_stream_output_buffer(lzma_stream* stream) -> void;

auto is_flush_action(lzma_action action) -> bool;

/**
* Initializes an LZMA compression encoder and its streams
*
* @param stream A pre-allocated `lzma_stream` object that is to be initialized
* @param compression_level
* @param dict_size Dictionary size that specifies how many bytes of the
* recently processed uncompressed data to keep in the memory
* @param check Type of integrity check calculated from the uncompressed data. LZMA_CHECK_CRC64 is
* the default in the xz command line tool. If the .xz file needs to be decompressed
* with XZ-Embedded, use LZMA_CHECK_CRC32 instead.
*/
auto init_lzma_encoder(
lzma_stream* stream,
int compression_level,
size_t dict_size,
lzma_check check = LZMA_CHECK_CRC64
) -> void;

auto attach_stream_output_buffer(lzma_stream* stream, Array<uint8_t>& out_buffer) -> void {
stream->next_out = out_buffer.data();
stream->avail_out = out_buffer.size();
}

auto detach_stream_input_src(lzma_stream* stream) -> void {
stream->next_in = nullptr;
stream->avail_in = 0;
}

auto detach_stream_output_buffer(lzma_stream* stream) -> void {
stream->next_out = nullptr;
stream->avail_out = 0;
}

auto is_flush_action(lzma_action action) -> bool {
return LZMA_SYNC_FLUSH == action || LZMA_FULL_FLUSH == action || LZMA_FULL_BARRIER == action
|| LZMA_FINISH == action;
}

auto init_lzma_encoder(
lzma_stream* stream,
int compression_level,
size_t dict_size,
lzma_check check
) -> void {
lzma_options_lzma options;
if (0 != lzma_lzma_preset(&options, compression_level)) {
SPDLOG_ERROR("Failed to initialize LZMA options' compression level.");
throw Compressor::OperationFailed(clp::ErrorCode_BadParam, __FILENAME__, __LINE__);
}
options.dict_size = dict_size;
std::array<lzma_filter, 2> filters{{
{.id = LZMA_FILTER_LZMA2, .options = &options},
{.id = LZMA_VLI_UNKNOWN, .options = nullptr},
}};

auto const rc = lzma_stream_encoder(stream, filters.data(), check);
if (LZMA_OK == rc) {
return;
}

char const* msg{nullptr};
switch (rc) {
case LZMA_MEM_ERROR:
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
msg = "Memory allocation failed";
break;

case LZMA_OPTIONS_ERROR:
msg = "Specified preset is not supported";
break;

case LZMA_UNSUPPORTED_CHECK:
msg = "Specified integrity check is not supported";
break;

case LZMA_PROG_ERROR:
msg = "Input arguments are not sane";
break;

default:
msg = "Unknown error";
break;
}

SPDLOG_ERROR("Error initializing the encoder: {} (error code {})", msg, static_cast<int>(rc));
throw Compressor::OperationFailed(clp::ErrorCode_BadParam, __FILENAME__, __LINE__);
}
} // namespace

namespace clp::streaming_compression::lzma {
auto Compressor::open(FileWriter& file_writer, int compression_level) -> void {
if (nullptr != m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

if (compression_level < cMinCompressionLevel || compression_level > cMaxCompressionLevel) {
throw OperationFailed(ErrorCode_Unsupported, __FILENAME__, __LINE__);
}

m_compression_stream = LZMA_STREAM_INIT;
init_lzma_encoder(&m_compression_stream, compression_level, m_dict_size);
detach_stream_input_src(&m_compression_stream);
attach_stream_output_buffer(&m_compression_stream, m_compressed_stream_block_buffer);
m_compressed_stream_file_writer = &file_writer;
m_uncompressed_stream_pos = 0;
}

auto Compressor::close() -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

if (m_compression_stream.avail_in > 0) {
SPDLOG_ERROR("Tried to close LZMA compressor with unprocessed input data.");
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
Comment on lines +39 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Process any remaining input data before closing the compressor.

In close(), if there is unprocessed input data, consider processing it rather than throwing an exception to ensure all data is compressed.

Apply this diff to process remaining data:

 if (m_compression_stream.avail_in > 0) {
-    SPDLOG_ERROR("Tried to close LZMA compressor with unprocessed input data.");
-    throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
+    SPDLOG_WARN("Closing compressor with unprocessed input data. Processing remaining data.");
+    encode_lzma();
 }

Committable suggestion skipped: line range outside the PR's diff.


flush_lzma(LZMA_FINISH);
// Deallocates LZMA stream's internal data structures
lzma_end(&m_compression_stream);
detach_stream_output_buffer(&m_compression_stream);
m_compressed_stream_file_writer = nullptr;
}

auto Compressor::write(char const* data, size_t data_length) -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

if (0 == data_length) {
return;
}

if (nullptr == data) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}

m_compression_stream.next_in = clp::size_checked_pointer_cast<uint8_t const>(data);
m_compression_stream.avail_in = data_length;
encode_lzma();
detach_stream_input_src(&m_compression_stream);
m_uncompressed_stream_pos += data_length;
}

auto Compressor::flush() -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
flush_lzma(LZMA_SYNC_FLUSH);
}

auto Compressor::try_get_pos(size_t& pos) const -> ErrorCode {
if (nullptr == m_compressed_stream_file_writer) {
return ErrorCode_NotInit;
}

pos = m_uncompressed_stream_pos;
return ErrorCode_Success;
}

auto Compressor::encode_lzma() -> void {
while (m_compression_stream.avail_in > 0) {
if (0 == m_compression_stream.avail_out) {
flush_stream_output_block_buffer();
}

auto const rc = lzma_code(&m_compression_stream, LZMA_RUN);
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
switch (rc) {
case LZMA_OK:
break;
case LZMA_BUF_ERROR:
SPDLOG_ERROR(
"LZMA compressor input stream is corrupt. No encoding progress can be made."
);
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
default:
SPDLOG_ERROR(
"lzma_code() returned an unexpected value - {}.",
static_cast<int>(rc)
);
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
}
}

auto Compressor::flush_lzma(lzma_action flush_action) -> void {
if (false == is_flush_action(flush_action)) {
SPDLOG_ERROR(
"lzma_code() supplied with invalid flush action - {}.",
static_cast<int>(flush_action)
);
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}

bool flushed{false};
while (false == flushed) {
if (0 == m_compression_stream.avail_out) {
flush_stream_output_block_buffer();
}

auto const rc = lzma_code(&m_compression_stream, flush_action);
switch (rc) {
case LZMA_OK:
break;
case LZMA_STREAM_END:
// NOTE: flush may not have completed if a multithreaded encoder is using action
// LZMA_FULL_BARRIER. For now, we skip this check.
flushed = true;
break;
case LZMA_BUF_ERROR:
// NOTE: this can happen if we are using LZMA_FULL_FLUSH or LZMA_FULL_BARRIER. These
// two actions keeps encoding input data alongside flushing buffered encoded data.
SPDLOG_ERROR(
"LZMA compressor input stream is corrupt. No encoding progress can be made."
);
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
default:
SPDLOG_ERROR(
"lzma_code() returned an unexpected value - {}.",
static_cast<int>(rc)
);
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
}

flush_stream_output_block_buffer();
}

auto Compressor::flush_stream_output_block_buffer() -> void {
if (cCompressedStreamBlockBufferSize == m_compression_stream.avail_out) {
return;
}
m_compressed_stream_file_writer->write(
clp::size_checked_pointer_cast<char>(m_compressed_stream_block_buffer.data()),
cCompressedStreamBlockBufferSize - m_compression_stream.avail_out
);
attach_stream_output_buffer(&m_compression_stream, m_compressed_stream_block_buffer);
}
} // namespace clp::streaming_compression::lzma
Loading
Loading