Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core-clp): Add LZMA Compressor implementation and LZMA dependency. #614

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
dceb564
Add lzma download and port lzma scripts
Nov 25, 2024
d5af274
Make unit test pass
Nov 25, 2024
b94ca26
Refactor lzma compressor to group common functionalities into helplers
Nov 26, 2024
707c412
Improve comments
Nov 27, 2024
6d1ab8f
Fix reference link
Nov 27, 2024
89b5707
Add install for CentOS
Nov 27, 2024
c646cea
Apply coderabbit suggestions
Nov 27, 2024
c91e5fb
Remove decompressor related files
Nov 27, 2024
26b0663
Address review concerns
Nov 30, 2024
740bc1c
Address review concern
Dec 2, 2024
e2be883
Simplify else-if
Dec 2, 2024
905367d
Fix else-if
Dec 2, 2024
8ae88b2
Add lzma (xz) dep to MacOS
Dec 2, 2024
0d0c20e
Refactor helper run_lzma()
Dec 2, 2024
559485d
Update function doc
Dec 2, 2024
7c69c69
Clarify unit test early termination
Dec 2, 2024
a6d68b8
Update components/core/tests/test-StreamingCompression.cpp
Bill-hbrhbr Dec 2, 2024
1519c21
Split LZMA_RUN from flush actions
Dec 3, 2024
655bb46
Refactor unit test
Dec 3, 2024
4fb6c01
Update components/core/src/clp/streaming_compression/lzma/Compressor.cpp
Bill-hbrhbr Dec 3, 2024
a8799b5
Merge edits
Dec 3, 2024
2b85f01
Fix import
Dec 3, 2024
eda7d6c
Apply suggestions from code review
Bill-hbrhbr Dec 4, 2024
4164a9d
Address review concern
Dec 4, 2024
8ab0653
Add a comment
Dec 5, 2024
c436f21
Apply suggestions from code review
Bill-hbrhbr Dec 6, 2024
7bd34d2
Update comment to 100-char length
Dec 6, 2024
efd2b27
Fix according to coding style guidelines
Dec 11, 2024
c530f92
Apply suggestions from code review
Bill-hbrhbr Dec 12, 2024
e751ee6
Update CMakeLists.txt
Dec 12, 2024
1c5efcd
Address review concern
Dec 12, 2024
856c7cb
Update TODO
Dec 12, 2024
43e22d2
Case fix
Dec 12, 2024
829a6b2
Remove unnecessary function inline comments
Dec 12, 2024
81e1807
Improve comment
Dec 12, 2024
09b73c7
Refactor lzma stream related functions into a nested helper class
Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/core/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ IncludeCategories:
# NOTE: A header is grouped by first matching regex
# Library headers. Update when adding new libraries.
# NOTE: clang-format retains leading white-space on a line in violation of the YAML spec.
- Regex: "<(absl|antlr4|archive|boost|bsoncxx|catch2|curl|date|fmt|json|log_surgeon|mongocxx\
- Regex: "<(absl|antlr4|archive|boost|bsoncxx|catch2|curl|date|fmt|json|log_surgeon|lzma|mongocxx\
|msgpack|mysql|openssl|outcome|regex_utils|simdjson|spdlog|sqlite3|string_utils|yaml-cpp|zstd)"
Priority: 3
# C system headers
Expand Down
24 changes: 22 additions & 2 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

# Set general compressor
set(GENERAL_COMPRESSOR "zstd" CACHE STRING "The general-purpose compressor used as the 2nd-stage compressor")
set_property(CACHE GENERAL_COMPRESSOR PROPERTY STRINGS passthrough zstd)
if ("${GENERAL_COMPRESSOR}" STREQUAL "passthrough")
set_property(CACHE GENERAL_COMPRESSOR PROPERTY STRINGS lzma passthrough zstd)
if ("${GENERAL_COMPRESSOR}" STREQUAL "lzma")
add_definitions(-DUSE_LZMA_COMPRESSION=1)
message(STATUS "Using Lempel–Ziv–Markov chain Algorithm compression")
elseif ("${GENERAL_COMPRESSOR}" STREQUAL "passthrough")
add_definitions(-DUSE_PASSTHROUGH_COMPRESSION=1)
message(STATUS "Using passthrough compression")
elseif ("${GENERAL_COMPRESSOR}" STREQUAL "zstd")
Expand Down Expand Up @@ -224,6 +227,19 @@ else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for ZStd")
endif()

# Find and setup LZMA Library
# TODO: Add support to enforce static linking against LZMA when desired. For a hack, we can set
# `CMAKE_FIND_LIBRARY_SUFFIXES` to ask CMake to prefer the static lib over the shared one.
find_package(LibLZMA REQUIRED)
if(LIBLZMA_FOUND)
message(STATUS "Found Lzma ${LIBLZMA_VERSION_STRING}")
message(STATUS "Lzma library location: ${LIBLZMA_LIBRARIES}")
message(STATUS "Lzma Include Dir: ${LIBLZMA_INCLUDE_DIRS}")
else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for Lzma")
endif()
include_directories(${LIBLZMA_INCLUDE_DIRS})

# sqlite dependencies
set(sqlite_DYNAMIC_LIBS "dl;m;pthread")
include(cmake/Modules/FindLibraryDependencies.cmake)
Expand Down Expand Up @@ -462,6 +478,9 @@ set(SOURCE_FILES_unitTest
src/clp/streaming_compression/Compressor.hpp
src/clp/streaming_compression/Constants.hpp
src/clp/streaming_compression/Decompressor.hpp
src/clp/streaming_compression/lzma/Compressor.cpp
src/clp/streaming_compression/lzma/Compressor.hpp
src/clp/streaming_compression/lzma/Constants.hpp
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
src/clp/streaming_compression/passthrough/Compressor.cpp
src/clp/streaming_compression/passthrough/Compressor.hpp
src/clp/streaming_compression/passthrough/Decompressor.cpp
Expand Down Expand Up @@ -549,6 +568,7 @@ target_link_libraries(unitTest
clp::regex_utils
clp::string_utils
yaml-cpp::yaml-cpp
${LIBLZMA_LIBRARIES}
ZStd::ZStd
)
target_compile_features(unitTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
namespace clp::streaming_compression {
enum class CompressorType : uint8_t {
ZSTD = 0x10,
LZMA = 0x20,
Passthrough = 0xFF,
};
} // namespace clp::streaming_compression
Expand Down
263 changes: 263 additions & 0 deletions components/core/src/clp/streaming_compression/lzma/Compressor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
#include "Compressor.hpp"

#include <array>
#include <cstddef>
#include <cstdint>
#include <cstring>

#include <lzma.h>
#include <spdlog/spdlog.h>

#include "../../ErrorCode.hpp"
#include "../../FileWriter.hpp"
#include "../../TraceableException.hpp"
#include "../../type_utils.hpp"
#include "Constants.hpp"

namespace {
using clp::streaming_compression::lzma::Compressor;

auto is_flush_action(lzma_action action) -> bool;

/**
* Initializes the LZMA compression stream
* @param stream A pre-allocated `lzma_stream` object
* @param compression_level
* @param dict_size Dictionary size that specifies how many bytes of the
* recently processed uncompressed data to keep in the memory
*/
auto init_lzma_encoder(lzma_stream* stream, int compression_level, size_t dict_size) -> void;

auto is_flush_action(lzma_action action) -> bool {
return LZMA_SYNC_FLUSH == action || LZMA_FULL_FLUSH == action || LZMA_FULL_BARRIER == action
|| LZMA_FINISH == action;
}

auto init_lzma_encoder(lzma_stream* stream, int compression_level, size_t dict_size) -> void {
lzma_options_lzma options;
if (0 != lzma_lzma_preset(&options, compression_level)) {
SPDLOG_ERROR("Failed to initialize LZMA options' compression level.");
throw Compressor::OperationFailed(clp::ErrorCode_BadParam, __FILENAME__, __LINE__);
}
options.dict_size = dict_size;
std::array<lzma_filter, 2> filters{{
{.id = LZMA_FILTER_LZMA2, .options = &options},
{.id = LZMA_VLI_UNKNOWN, .options = nullptr},
}};

// Initializes the encoder using a preset. Set the integrity to check to CRC64, which is the
// default in the xz command line tool. If the .xz file needs to be decompressed with
// XZ-Embedded, use LZMA_CHECK_CRC32 instead.
auto const rc = lzma_stream_encoder(stream, filters.data(), LZMA_CHECK_CRC64);

if (LZMA_OK == rc) {
return;
}

// Something went wrong. The possible errors are documented in lzma/container.h
// (src/liblzma/api/lzma/container.h in the source package or e.g. /usr/include/lzma/container.h
// depending on the install prefix).
char const* msg{nullptr};
switch (rc) {
case LZMA_MEM_ERROR:
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
msg = "Memory allocation failed";
break;

case LZMA_OPTIONS_ERROR:
msg = "Specified preset is not supported";
break;

case LZMA_UNSUPPORTED_CHECK:
msg = "Specified integrity check is not supported";
break;

case LZMA_PROG_ERROR:
msg = "Input arguments are not sane";
break;

default:
msg = "Unknown error";
break;
}

SPDLOG_ERROR("Error initializing the encoder: {} (error code {})", msg, static_cast<int>(rc));
throw Compressor::OperationFailed(clp::ErrorCode_BadParam, __FILENAME__, __LINE__);
}
} // namespace

namespace clp::streaming_compression::lzma {
auto Compressor::open(FileWriter& file_writer, int compression_level) -> void {
if (nullptr != m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

if (compression_level < cMinCompressionLevel || compression_level > cMaxCompressionLevel) {
throw OperationFailed(ErrorCode_Unsupported, __FILENAME__, __LINE__);
}

m_compression_stream = LZMA_STREAM_INIT;
init_lzma_encoder(&m_compression_stream, compression_level, m_dict_size);

// No input upon initialization
m_compression_stream.next_in = nullptr;
m_compression_stream.avail_in = 0;

// Attach output buffer to LZMA stream
m_compression_stream.next_out = m_compressed_stream_block_buffer.data();
m_compression_stream.avail_out = m_compressed_stream_block_buffer.size();

m_compressed_stream_file_writer = &file_writer;

m_uncompressed_stream_pos = 0;
}

auto Compressor::close() -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

if (m_compression_stream.avail_in > 0) {
SPDLOG_ERROR("Tried to close LZMA compressor with unprocessed input data.");
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
Comment on lines +39 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Process any remaining input data before closing the compressor.

In close(), if there is unprocessed input data, consider processing it rather than throwing an exception to ensure all data is compressed.

Apply this diff to process remaining data:

 if (m_compression_stream.avail_in > 0) {
-    SPDLOG_ERROR("Tried to close LZMA compressor with unprocessed input data.");
-    throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
+    SPDLOG_WARN("Closing compressor with unprocessed input data. Processing remaining data.");
+    encode_lzma();
 }

Committable suggestion skipped: line range outside the PR's diff.


flush_lzma(LZMA_FINISH);

// Deallocates LZMA stream's internal data structures
lzma_end(&m_compression_stream);

// Detach output buffer from LZMA stream
m_compression_stream.next_out = nullptr;
m_compression_stream.avail_out = 0;

m_compressed_stream_file_writer = nullptr;
}

auto Compressor::write(char const* data, size_t data_length) -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

if (0 == data_length) {
// Nothing needs to be done because we do not need to compress anything
return;
}

if (nullptr == data) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}

// Attach input data to LZMA stream
m_compression_stream.next_in = clp::size_checked_pointer_cast<uint8_t const>(data);
m_compression_stream.avail_in = data_length;

encode_lzma();

// All input data have been encoded so detach input data
m_compression_stream.next_in = nullptr;
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
m_compression_stream.avail_in = 0;

m_uncompressed_stream_pos += data_length;
}

auto Compressor::flush() -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
flush_lzma(LZMA_SYNC_FLUSH);
}

auto Compressor::try_get_pos(size_t& pos) const -> ErrorCode {
if (nullptr == m_compressed_stream_file_writer) {
return ErrorCode_NotInit;
}

pos = m_uncompressed_stream_pos;
return ErrorCode_Success;
}

auto Compressor::encode_lzma() -> void {
while (m_compression_stream.avail_in > 0) {
// Write output buffer to file if it's full
if (0 == m_compression_stream.avail_out) {
flush_stream_output_block_buffer();
}

auto const rc = lzma_code(&m_compression_stream, LZMA_RUN);
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
switch (rc) {
case LZMA_OK:
break;
case LZMA_BUF_ERROR: // No encoding progress can be made
SPDLOG_ERROR("LZMA compressor input stream is corrupt.");
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
default:
SPDLOG_ERROR(
"lzma_code() returned an unexpected value - {}.",
static_cast<int>(rc)
);
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
}
}

auto Compressor::flush_lzma(lzma_action flush_action) -> void {
if (false == is_flush_action(flush_action)) {
SPDLOG_ERROR(
"lzma_code() supplied with invalid flush action - {}.",
static_cast<int>(flush_action)
);
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}

/**
* Once flushing starts, the workflow action needs to stay the same until flushing is signaled
* complete by LZMA (aka LZMA_STREAM_END is reached).
* See also: https://github.com/tukaani-project/xz/blob/master/src/liblzma/api/lzma/base.h#L274
*/
bool flushed{false};
while (false == flushed) {
// Write output buffer to file if it's full
if (0 == m_compression_stream.avail_out) {
flush_stream_output_block_buffer();
}

auto const rc = lzma_code(&m_compression_stream, flush_action);
switch (rc) {
case LZMA_OK:
break;
case LZMA_STREAM_END:
// NOTE: this might not be true when multithreaded encoder is used with
// LZMA_FULL_BARRIER. For now, we skip this check.
flushed = true;
break;
case LZMA_BUF_ERROR: // No encoding progress can be made
// NOTE: this can happen if we are using LZMA_FULL_FLUSH or LZMA_FULL_BARRIER. These
// two actions keeps encoding input data alongside flushing buffered encoded data.
SPDLOG_ERROR("LZMA compressor input stream is corrupt.");
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
default:
SPDLOG_ERROR(
"lzma_code() returned an unexpected value - {}.",
static_cast<int>(rc)
);
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
}

// Write the last chunk of output
flush_stream_output_block_buffer();
}

auto Compressor::flush_stream_output_block_buffer() -> void {
if (cCompressedStreamBlockBufferSize == m_compression_stream.avail_out) {
// Nothing to flush
return;
}
m_compressed_stream_file_writer->write(
clp::size_checked_pointer_cast<char>(m_compressed_stream_block_buffer.data()),
cCompressedStreamBlockBufferSize - m_compression_stream.avail_out
);
m_compression_stream.next_out = m_compressed_stream_block_buffer.data();
m_compression_stream.avail_out = cCompressedStreamBlockBufferSize;
}
} // namespace clp::streaming_compression::lzma
Loading
Loading