Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core-clp): Add LZMA Compressor implementation and LZMA dependency. #614

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
dceb564
Add lzma download and port lzma scripts
Nov 25, 2024
d5af274
Make unit test pass
Nov 25, 2024
b94ca26
Refactor lzma compressor to group common functionalities into helplers
Nov 26, 2024
707c412
Improve comments
Nov 27, 2024
6d1ab8f
Fix reference link
Nov 27, 2024
89b5707
Add install for CentOS
Nov 27, 2024
c646cea
Apply coderabbit suggestions
Nov 27, 2024
c91e5fb
Remove decompressor related files
Nov 27, 2024
26b0663
Address review concerns
Nov 30, 2024
740bc1c
Address review concern
Dec 2, 2024
e2be883
Simplify else-if
Dec 2, 2024
905367d
Fix else-if
Dec 2, 2024
8ae88b2
Add lzma (xz) dep to MacOS
Dec 2, 2024
0d0c20e
Refactor helper run_lzma()
Dec 2, 2024
559485d
Update function doc
Dec 2, 2024
7c69c69
Clarify unit test early termination
Dec 2, 2024
a6d68b8
Update components/core/tests/test-StreamingCompression.cpp
Bill-hbrhbr Dec 2, 2024
1519c21
Split LZMA_RUN from flush actions
Dec 3, 2024
655bb46
Refactor unit test
Dec 3, 2024
4fb6c01
Update components/core/src/clp/streaming_compression/lzma/Compressor.cpp
Bill-hbrhbr Dec 3, 2024
a8799b5
Merge edits
Dec 3, 2024
2b85f01
Fix import
Dec 3, 2024
eda7d6c
Apply suggestions from code review
Bill-hbrhbr Dec 4, 2024
4164a9d
Address review concern
Dec 4, 2024
8ab0653
Add a comment
Dec 5, 2024
c436f21
Apply suggestions from code review
Bill-hbrhbr Dec 6, 2024
7bd34d2
Update comment to 100-char length
Dec 6, 2024
efd2b27
Fix according to coding style guidelines
Dec 11, 2024
c530f92
Apply suggestions from code review
Bill-hbrhbr Dec 12, 2024
e751ee6
Update CMakeLists.txt
Dec 12, 2024
1c5efcd
Address review concern
Dec 12, 2024
856c7cb
Update TODO
Dec 12, 2024
43e22d2
Case fix
Dec 12, 2024
829a6b2
Remove unnecessary function inline comments
Dec 12, 2024
81e1807
Improve comment
Dec 12, 2024
09b73c7
Refactor lzma stream related functions into a nested helper class
Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/core/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ IncludeCategories:
# NOTE: A header is grouped by first matching regex
# Library headers. Update when adding new libraries.
# NOTE: clang-format retains leading white-space on a line in violation of the YAML spec.
- Regex: "<(absl|antlr4|archive|boost|bsoncxx|catch2|curl|date|fmt|json|log_surgeon|mongocxx\
- Regex: "<(absl|antlr4|archive|boost|bsoncxx|catch2|curl|date|fmt|json|log_surgeon|lzma|mongocxx\
|msgpack|mysql|openssl|outcome|regex_utils|simdjson|spdlog|sqlite3|string_utils|yaml-cpp|zstd)"
Priority: 3
# C system headers
Expand Down
24 changes: 23 additions & 1 deletion components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

# Set general compressor
set(GENERAL_COMPRESSOR "zstd" CACHE STRING "The general-purpose compressor used as the 2nd-stage compressor")
set_property(CACHE GENERAL_COMPRESSOR PROPERTY STRINGS passthrough zstd)
set_property(CACHE GENERAL_COMPRESSOR PROPERTY STRINGS passthrough zstd lzma)
if ("${GENERAL_COMPRESSOR}" STREQUAL "passthrough")
add_definitions(-DUSE_PASSTHROUGH_COMPRESSION=1)
message(STATUS "Using passthrough compression")
elseif ("${GENERAL_COMPRESSOR}" STREQUAL "zstd")
add_definitions(-DUSE_ZSTD_COMPRESSION=1)
message(STATUS "Using Zstandard compression")
elseif ("${GENERAL_COMPRESSOR}" STREQUAL "lzma")
add_definitions(-DUSE_LZMA_COMPRESSION=1)
message(STATUS "Using Lempel–Ziv–Markov chain Algorithm compression")
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
else()
message(SEND_ERROR "GENERAL_COMPRESSOR=${GENERAL_COMPRESSOR} is unimplemented.")
endif()
Expand Down Expand Up @@ -224,6 +227,19 @@ else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for ZStd")
endif()

# Find and setup LZMA Library
# Notice that we don't have support to switch between static and shared libraries.
# TODO: add a script in ./cmake/Modules to resolve .a vs. .so
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
find_package(LibLZMA REQUIRED)
if(LIBLZMA_FOUND)
message(STATUS "Found LIBLZMA_FOUND ${LIBLZMA_VERSION_STRING}")
message(STATUS "Lzma library location: ${LIBLZMA_LIBRARIES}")
else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for LIBLZMA_FOUND")
endif()
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
include_directories(${LIBLZMA_INCLUDE_DIRS})
message("LZMA Include Dir: ${LIBLZMA_INCLUDE_DIRS}")

# sqlite dependencies
set(sqlite_DYNAMIC_LIBS "dl;m;pthread")
include(cmake/Modules/FindLibraryDependencies.cmake)
Expand Down Expand Up @@ -462,6 +478,11 @@ set(SOURCE_FILES_unitTest
src/clp/streaming_compression/Compressor.hpp
src/clp/streaming_compression/Constants.hpp
src/clp/streaming_compression/Decompressor.hpp
src/clp/streaming_compression/lzma/Compressor.cpp
src/clp/streaming_compression/lzma/Compressor.hpp
src/clp/streaming_compression/lzma/Decompressor.cpp
src/clp/streaming_compression/lzma/Decompressor.hpp
src/clp/streaming_compression/lzma/Constants.hpp
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
src/clp/streaming_compression/passthrough/Compressor.cpp
src/clp/streaming_compression/passthrough/Compressor.hpp
src/clp/streaming_compression/passthrough/Decompressor.cpp
Expand Down Expand Up @@ -549,6 +570,7 @@ target_link_libraries(unitTest
clp::regex_utils
clp::string_utils
yaml-cpp::yaml-cpp
${LIBLZMA_LIBRARIES}
ZStd::ZStd
)
target_compile_features(unitTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
namespace clp::streaming_compression {
enum class CompressorType : uint8_t {
ZSTD = 0x10,
LZMA = 0x20,
Passthrough = 0xFF,
};
} // namespace clp::streaming_compression
Expand Down
214 changes: 214 additions & 0 deletions components/core/src/clp/streaming_compression/lzma/Compressor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
#include "Compressor.hpp"

#include <array>
#include <cstddef>
#include <cstdint>
#include <cstring>

#include <lzma.h>
#include <spdlog/spdlog.h>

#include "../../ErrorCode.hpp"
#include "../../FileWriter.hpp"
#include "../../TraceableException.hpp"
#include "../../type_utils.hpp"
#include "Constants.hpp"

namespace clp::streaming_compression::lzma {
using clp::size_checked_pointer_cast;

auto Compressor::init_lzma_encoder(LzmaStream* strm, int compression_level, size_t dict_size)
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
-> void {
LzmaOptionsLzma options;
if (0 != lzma_lzma_preset(&options, compression_level)) {
SPDLOG_ERROR("Failed to initialize LZMA options' compression level.");
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}
options.dict_size = dict_size;
std::array<LzmaFilter, 2> filters{{
{.id = LZMA_FILTER_LZMA2, .options = &options},
{.id = LZMA_VLI_UNKNOWN, .options = nullptr},
}};

// Initialize the encoder using a preset. Set the integrity to check
// to CRC64, which is the default in the xz command line tool. If
// the .xz file needs to be decompressed with XZ Embedded, use
// LZMA_CHECK_CRC32 instead.
auto const ret{lzma_stream_encoder(strm, filters.data(), LZMA_CHECK_CRC64)};
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved

// Return successfully if the initialization went fine.
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
if (LZMA_OK == ret) {
return;
}

// Something went wrong. The possible errors are documented in
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
// lzma/container.h (src/liblzma/api/lzma/container.h in the source
// package or e.g. /usr/include/lzma/container.h depending on the
// install prefix).
char const* msg{nullptr};
switch (ret) {
case LZMA_MEM_ERROR:
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
msg = "Memory allocation failed";
break;

case LZMA_OPTIONS_ERROR:
msg = "Specified preset is not supported";
break;

case LZMA_UNSUPPORTED_CHECK:
msg = "Specified integrity check is not supported";
break;

default:
// This is most likely LZMA_PROG_ERROR indicating a bug in
// this program or in liblzma. It is inconvenient to have a
// separate error message for errors that should be impossible
// to occur, but knowing the error code is important for
// debugging. That's why it is good to print the error code
// at least when there is no good error message to show.
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
msg = "Unknown error, possibly a bug";
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
break;
}

SPDLOG_ERROR("Error initializing the encoder: {} (error code {})", msg, static_cast<int>(ret));
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
}

auto Compressor::open(FileWriter& file_writer, int compression_level) -> void {
if (nullptr != m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

if (compression_level < cMinCompressionLevel || compression_level > cMaxCompressionLevel) {
throw OperationFailed(ErrorCode_Unsupported, __FILENAME__, __LINE__);
}

memset(m_compression_stream.get(), 0, sizeof(LzmaStream));
init_lzma_encoder(m_compression_stream.get(), compression_level, m_dict_size);
// Setup compressed stream parameters
m_compression_stream->next_in = nullptr;
m_compression_stream->avail_in = 0;
m_compression_stream->next_out = m_compressed_stream_block_buffer.data();
m_compression_stream->avail_out = m_compressed_stream_block_buffer.size();

m_compressed_stream_file_writer = &file_writer;

m_uncompressed_stream_pos = 0;
}

auto Compressor::close() -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

flush_and_close_compression_stream();
m_compressed_stream_file_writer = nullptr;
}

auto Compressor::write(char const* data, size_t data_length) -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

if (0 == data_length) {
// Nothing needs to be done because we do not need to compress anything
return;
}

if (nullptr == data) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}

m_compression_stream->next_in = size_checked_pointer_cast<uint8_t const>(data);
m_compression_stream->avail_in = data_length;

// Normal compression encoding workflow. Continue until the input buffer is
// exhausted.
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
compress(LZMA_RUN);

m_compression_stream->next_in = nullptr;

m_compression_stream_contains_data = true;
m_uncompressed_stream_pos += data_length;
}

auto Compressor::flush() -> void {
if (false == m_compression_stream_contains_data) {
return;
}

// Forces all the buffered data to be available at output
compress(LZMA_SYNC_FLUSH);
m_compression_stream_contains_data = false;
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
}

auto Compressor::try_get_pos(size_t& pos) const -> ErrorCode {
if (nullptr == m_compressed_stream_file_writer) {
return ErrorCode_NotInit;
}

pos = m_uncompressed_stream_pos;
return ErrorCode_Success;
}

auto Compressor::flush_and_close_compression_stream() -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

// Same as flush but all the input data must have been given to the encoder
compress(LZMA_FINISH);

m_compression_stream_contains_data = false;

lzma_end(m_compression_stream.get());
m_compression_stream->avail_out = 0;
m_compression_stream->next_out = nullptr;
}

auto Compressor::compress(LzmaAction action) -> void {
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
bool hit_input_eof{false};
while (true) {
auto const rc = lzma_code(m_compression_stream.get(), action);
switch (rc) {
case LZMA_OK:
case LZMA_BUF_ERROR:
break;
case LZMA_STREAM_END:
hit_input_eof = true;
break;
default:
SPDLOG_ERROR("lzma() returned an unexpected value - {}.", static_cast<int>(rc));
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}

if (LZMA_RUN == action && 0 == m_compression_stream->avail_in) {
// No more data to compress
break;
}

if (hit_input_eof) {
break;
}

// Write output buffer to file if it's full
if (0 == m_compression_stream->avail_out) {
pipe_data();
}
}

// Write remaining compressed data
if (m_compression_stream->avail_out < cCompressedStreamBlockBufferSize) {
pipe_data();
}
}
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved

auto Compressor::pipe_data() -> void {
m_compressed_stream_file_writer->write(
size_checked_pointer_cast<char>(m_compressed_stream_block_buffer.data()),
cCompressedStreamBlockBufferSize - m_compression_stream->avail_out
);
m_compression_stream->next_out = m_compressed_stream_block_buffer.data();
m_compression_stream->avail_out = cCompressedStreamBlockBufferSize;
}
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
} // namespace clp::streaming_compression::lzma
Loading
Loading