Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add unified compression API and lz4_frame/lz4_raw/lz4_hadoop codec #7589

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
11 changes: 10 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF)
option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF)
option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF)
option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON)
option(VELOX_ENABLE_COMPRESSION_LZ4 "Enable Lz4 compression support." OFF)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably do not need this option.


option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF)
option(VELOX_BUILD_VECTOR_TEST_UTILS "Builds Velox vector test utilities" OFF)
Expand Down Expand Up @@ -173,6 +174,10 @@ if(${VELOX_BUILD_MINIMAL} OR ${VELOX_BUILD_MINIMAL_WITH_DWIO})
set(VELOX_ENABLE_SUBSTRAIT OFF)
endif()

if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR})
set(VELOX_ENABLE_COMPRESSION_LZ4 ON)
endif()

if(${VELOX_BUILD_TESTING})
# Enable all components to build testing binaries
set(VELOX_ENABLE_PRESTO_FUNCTIONS ON)
Expand All @@ -184,6 +189,7 @@ if(${VELOX_BUILD_TESTING})
set(VELOX_ENABLE_SPARK_FUNCTIONS ON)
set(VELOX_ENABLE_EXAMPLES ON)
set(VELOX_ENABLE_PARQUET ON)
set(VELOX_ENABLE_COMPRESSION_LZ4 ON)
endif()

if(${VELOX_ENABLE_BENCHMARKS})
Expand Down Expand Up @@ -469,12 +475,15 @@ velox_resolve_dependency(glog)
velox_set_source(fmt)
velox_resolve_dependency(fmt 9.0.0)

if(VELOX_ENABLE_COMPRESSION_LZ4)
find_package(lz4 REQUIRED)
endif()

if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR})
# DWIO needs all sorts of stream compression libraries.
#
# TODO: make these optional and pluggable.
find_package(ZLIB REQUIRED)
find_package(lz4 REQUIRED)
find_package(lzo2 REQUIRED)
find_package(zstd REQUIRED)
find_package(Snappy REQUIRED)
Expand Down
10 changes: 9 additions & 1 deletion velox/common/compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,13 @@ endif()
velox_add_library(velox_common_compression Compression.cpp LzoDecompressor.cpp)
velox_link_libraries(
velox_common_compression
PUBLIC Folly::folly
PUBLIC velox_status Folly::folly
PRIVATE velox_exception)

if(VELOX_ENABLE_COMPRESSION_LZ4)
velox_sources(velox_common_compression PRIVATE Lz4Compression.cpp
HadoopCompressionFormat.cpp)
velox_link_libraries(velox_common_compression PUBLIC lz4::lz4)
velox_compile_definitions(velox_common_compression
PRIVATE VELOX_ENABLE_COMPRESSION_LZ4)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah it's not used in a header so PRIVATE is better, good catch!

endif()
131 changes: 131 additions & 0 deletions velox/common/compression/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#include "velox/common/compression/Compression.h"
#include "velox/common/base/Exceptions.h"
#ifdef VELOX_ENABLE_COMPRESSION_LZ4
#include "velox/common/compression/Lz4Compression.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like he we would protect this with a a VELOX_ENABLE_COMPRESSION_LZ4 (or similar)

#endif

#include <folly/Conv.h>

Expand Down Expand Up @@ -98,4 +101,132 @@ CompressionKind stringToCompressionKind(const std::string& kind) {
VELOX_UNSUPPORTED("Not support compression kind {}", kind);
}
}

Status Codec::init() {
return Status::OK();
}

bool Codec::supportsGetUncompressedLength(CompressionKind kind) {
// TODO: Return true if it's supported by compression kind.
return false;
}

bool Codec::supportsStreamingCompression(CompressionKind kind) {
switch (kind) {
#ifdef VELOX_ENABLE_COMPRESSION_LZ4
case CompressionKind::CompressionKind_LZ4:
return true;
#endif
default:
return false;
}
}

bool Codec::supportsCompressFixedLength(CompressionKind kind) {
// TODO: Return true if it's supported by compression kind.
return false;
}

Expected<std::unique_ptr<Codec>> Codec::create(
CompressionKind kind,
const CodecOptions& codecOptions) {
if (!isAvailable(kind)) {
auto name = compressionKindToString(kind);
VELOX_RETURN_UNEXPECTED_IF(
folly::StringPiece({name}).startsWith("unknown"),
Status::Invalid("Unrecognized codec: ", name));
return folly::makeUnexpected(Status::Invalid(
"Support for codec '{}' is either not built or not implemented.",
name));
}

auto compressionLevel = codecOptions.compressionLevel;
std::unique_ptr<Codec> codec;
switch (kind) {
#ifdef VELOX_ENABLE_COMPRESSION_LZ4
case CompressionKind::CompressionKind_LZ4:
if (auto options = dynamic_cast<const Lz4CodecOptions*>(&codecOptions)) {
switch (options->type) {
case Lz4CodecOptions::kLz4Frame:
codec = makeLz4FrameCodec(compressionLevel);
break;
case Lz4CodecOptions::kLz4Raw:
codec = makeLz4RawCodec(compressionLevel);
break;
case Lz4CodecOptions::kLz4Hadoop:
codec = makeLz4HadoopCodec();
break;
}
}
// By default, create LZ4 Frame codec.
codec = makeLz4FrameCodec(compressionLevel);
break;
#endif
default:
break;
}
VELOX_RETURN_UNEXPECTED_IF(
codec == nullptr,
Status::Invalid(fmt::format(
"Support for codec '{}' is either not built or not implemented.",
compressionKindToString(kind))));

VELOX_RETURN_UNEXPECTED_NOT_OK(codec->init());

return codec;
}

Expected<std::unique_ptr<Codec>> Codec::create(
CompressionKind kind,
int32_t compressionLevel) {
return create(kind, CodecOptions{compressionLevel});
}

bool Codec::isAvailable(CompressionKind kind) {
switch (kind) {
case CompressionKind::CompressionKind_NONE:
return true;
#ifdef VELOX_ENABLE_COMPRESSION_LZ4
case CompressionKind::CompressionKind_LZ4:
return true;
#endif
default:
return false;
}
}

std::optional<uint64_t> Codec::getUncompressedLength(
const uint8_t* input,
uint64_t inputLength) const {
return std::nullopt;
}

Expected<uint64_t> Codec::compressFixedLength(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength) {
return folly::makeUnexpected(
Status::Invalid("'{}' doesn't support fixed-length compression", name()));
}

Expected<std::shared_ptr<StreamingCompressor>>
Codec::makeStreamingCompressor() {
return folly::makeUnexpected(Status::Invalid(
"Streaming compression is unsupported with {} format.", name()));
}

Expected<std::shared_ptr<StreamingDecompressor>>
Codec::makeStreamingDecompressor() {
return folly::makeUnexpected(Status::Invalid(
"Streaming decompression is unsupported with {} format.", name()));
}

int32_t Codec::compressionLevel() const {
return kUseDefaultCompressionLevel;
}

std::string Codec::name() const {
return compressionKindToString(compressionKind());
}
} // namespace facebook::velox::common
Loading
Loading