Skip to content

Commit

Permalink
add compression v2 API and lz4_frame/lz4_raw/lz4_hadoop codec
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 16, 2023
1 parent 9cfa1a6 commit 26dbdea
Show file tree
Hide file tree
Showing 14 changed files with 1,786 additions and 14 deletions.
24 changes: 11 additions & 13 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -424,20 +424,18 @@ resolve_dependency(glog)
set_source(fmt)
resolve_dependency(fmt)

if(NOT ${VELOX_BUILD_MINIMAL})
find_package(ZLIB REQUIRED)
find_package(lz4 REQUIRED)
find_package(lzo2 REQUIRED)
find_package(zstd REQUIRED)
find_package(Snappy REQUIRED)
if(NOT TARGET zstd::zstd)
if(TARGET zstd::libzstd_static)
set(ZSTD_TYPE static)
else()
set(ZSTD_TYPE shared)
endif()
add_library(zstd::zstd ALIAS zstd::libzstd_${ZSTD_TYPE})
find_package(ZLIB REQUIRED)
find_package(lz4 REQUIRED)
find_package(lzo2 REQUIRED)
find_package(zstd REQUIRED)
find_package(Snappy REQUIRED)
if(NOT TARGET zstd::zstd)
if(TARGET zstd::libzstd_static)
set(ZSTD_TYPE static)
else()
set(ZSTD_TYPE shared)
endif()
add_library(zstd::zstd ALIAS zstd::libzstd_${ZSTD_TYPE})
endif()

set_source(re2)
Expand Down
1 change: 1 addition & 0 deletions velox/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
add_subdirectory(base)
add_subdirectory(caching)
add_subdirectory(compression)
add_subdirectory(compression/v2)
add_subdirectory(config)
add_subdirectory(encode)
add_subdirectory(file)
Expand Down
8 changes: 7 additions & 1 deletion velox/common/compression/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ std::string compressionKindToString(CompressionKind kind) {
return "lz4";
case CompressionKind_GZIP:
return "gzip";
case CompressionKind_LZ4RAW:
return "lz4_raw";
case CompressionKind_LZ4HADOOP:
return "lz4_hadoop";
}
return folly::to<std::string>("unknown - ", kind);
}
Expand All @@ -89,7 +93,9 @@ CompressionKind stringToCompressionKind(const std::string& kind) {
{"lzo", CompressionKind_LZO},
{"zstd", CompressionKind_ZSTD},
{"lz4", CompressionKind_LZ4},
{"gzip", CompressionKind_GZIP}};
{"gzip", CompressionKind_GZIP},
{"lz4_raw", CompressionKind_LZ4RAW},
{"lz4_hadoop", CompressionKind_LZ4HADOOP}};
auto iter = stringToCompressionKindMap.find(kind);
if (iter != stringToCompressionKindMap.end()) {
return iter->second;
Expand Down
2 changes: 2 additions & 0 deletions velox/common/compression/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ enum CompressionKind {
CompressionKind_ZSTD = 4,
CompressionKind_LZ4 = 5,
CompressionKind_GZIP = 6,
CompressionKind_LZ4RAW = 7,
CompressionKind_LZ4HADOOP = 8,
CompressionKind_MAX = INT64_MAX
};

Expand Down
5 changes: 5 additions & 0 deletions velox/common/compression/tests/CompressionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ TEST_F(CompressionTest, testCompressionNames) {
EXPECT_EQ("lzo", compressionKindToString(CompressionKind_LZO));
EXPECT_EQ("lz4", compressionKindToString(CompressionKind_LZ4));
EXPECT_EQ("zstd", compressionKindToString(CompressionKind_ZSTD));
EXPECT_EQ("gzip", compressionKindToString(CompressionKind_GZIP));
EXPECT_EQ("lz4_raw", compressionKindToString(CompressionKind_LZ4RAW));
EXPECT_EQ("lz4_hadoop", compressionKindToString(CompressionKind_LZ4HADOOP));
EXPECT_EQ(
"unknown - 99",
compressionKindToString(static_cast<CompressionKind>(99)));
Expand All @@ -56,6 +59,8 @@ TEST_F(CompressionTest, stringToCompressionKind) {
EXPECT_EQ(stringToCompressionKind("lz4"), CompressionKind_LZ4);
EXPECT_EQ(stringToCompressionKind("zstd"), CompressionKind_ZSTD);
EXPECT_EQ(stringToCompressionKind("gzip"), CompressionKind_GZIP);
EXPECT_EQ(stringToCompressionKind("lz4_raw"), CompressionKind_LZ4RAW);
EXPECT_EQ(stringToCompressionKind("lz4_hadoop"), CompressionKind_LZ4HADOOP);
VELOX_ASSERT_THROW(
stringToCompressionKind("bz2"), "Not support compression kind bz2");
}
Expand Down
29 changes: 29 additions & 0 deletions velox/common/compression/v2/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()

add_library(velox_common_compression_v2
Compression.cpp HadoopCompressionFormat.cpp Lz4Compression.cpp)

target_link_libraries(
velox_common_compression_v2
velox_common_base
Folly::folly
Snappy::snappy
zstd::zstd
ZLIB::ZLIB
lz4::lz4)
196 changes: 196 additions & 0 deletions velox/common/compression/v2/Compression.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Adapted from Apache Arrow.

#include "velox/common/compression/v2/Compression.h"
#include <memory>
#include <string>
#include <utility>
#include "velox/common/base/Exceptions.h"
#include "velox/common/compression/v2/Lz4Compression.h"

namespace facebook::velox::common {

namespace {
void checkSupportsCompressionLevel(CompressionKind kind) {
VELOX_USER_CHECK(
Codec::supportsCompressionLevel(kind),
"Codec '" + compressionKindToString(kind) +
"' doesn't support setting a compression level.");
}
} // namespace

int32_t Codec::useDefaultCompressionLevel() {
return kUseDefaultCompressionLevel;
}

void Codec::init() {}

bool Codec::supportsGetUncompressedLength(CompressionKind kind) {
switch (kind) {
default:
return false;
}
}

bool Codec::supportsCompressionLevel(CompressionKind kind) {
switch (kind) {
case CompressionKind::CompressionKind_LZ4:
case CompressionKind::CompressionKind_LZ4RAW:
return true;
default:
return false;
}
}

bool Codec::supportsStreamingCompression(CompressionKind kind) {
switch (kind) {
case CompressionKind::CompressionKind_LZ4:
case CompressionKind::CompressionKind_GZIP:
case CompressionKind::CompressionKind_ZLIB:
return true;
default:
return false;
}
}

int32_t Codec::maximumCompressionLevel(CompressionKind kind) {
checkSupportsCompressionLevel(kind);
auto codec = Codec::create(kind);
return codec->maximumCompressionLevel();
}

int32_t Codec::minimumCompressionLevel(CompressionKind kind) {
checkSupportsCompressionLevel(kind);
auto codec = Codec::create(kind);
return codec->minimumCompressionLevel();
}

int32_t Codec::defaultCompressionLevel(CompressionKind kind) {
checkSupportsCompressionLevel(kind);
auto codec = Codec::create(kind);
return codec->defaultCompressionLevel();
}

std::unique_ptr<Codec> Codec::create(
CompressionKind kind,
const CodecOptions& codecOptions) {
if (!isAvailable(kind)) {
auto name = compressionKindToString(kind);
if (folly::StringPiece({name}).startsWith("unknown")) {
VELOX_UNSUPPORTED("Unrecognized codec '{}'", name);
}
VELOX_UNSUPPORTED("Support for codec '{}' not implemented.", name);
}

auto compressionLevel = codecOptions.compressionLevel;
if (compressionLevel != kUseDefaultCompressionLevel) {
checkSupportsCompressionLevel(kind);
}

std::unique_ptr<Codec> codec;
switch (kind) {
case CompressionKind::CompressionKind_LZ4:
codec = makeLz4FrameCodec(compressionLevel);
break;
case CompressionKind::CompressionKind_LZ4RAW:
codec = makeLz4RawCodec(compressionLevel);
break;
case CompressionKind::CompressionKind_LZ4HADOOP:
codec = makeLz4HadoopRawCodec();
break;
default:
break;
}

if (codec == nullptr) {
VELOX_UNSUPPORTED("LZO codec not implemented");
}

codec->init();

return codec;
}

// use compression level to create Codec
std::unique_ptr<Codec> Codec::create(
CompressionKind kind,
int32_t compressionLevel) {
return create(kind, CodecOptions{compressionLevel});
}

bool Codec::isAvailable(CompressionKind kind) {
switch (kind) {
case CompressionKind::CompressionKind_NONE:
case CompressionKind::CompressionKind_LZ4:
case CompressionKind::CompressionKind_LZ4RAW:
case CompressionKind::CompressionKind_LZ4HADOOP:
return true;
case CompressionKind::CompressionKind_SNAPPY:
case CompressionKind::CompressionKind_GZIP:
case CompressionKind::CompressionKind_ZLIB:
case CompressionKind::CompressionKind_ZSTD:
case CompressionKind::CompressionKind_LZO:
default:
return false;
}
}

std::optional<uint64_t> Codec::getUncompressedLength(
uint64_t inputLength,
const uint8_t* input,
std::optional<uint64_t> uncompressedLength) const {
if (inputLength == 0) {
if (uncompressedLength.value_or(0) != 0) {
VELOX_USER_CHECK_EQ(
uncompressedLength.value_or(0),
0,
"Invalid uncompressed length: {}.",
*uncompressedLength);
}
return 0;
}
auto actualLength =
doGetUncompressedLength(inputLength, input, uncompressedLength);
if (actualLength) {
if (uncompressedLength) {
VELOX_USER_CHECK_EQ(
*actualLength,
*uncompressedLength,
"Invalid uncompressed length: {}.",
*uncompressedLength);
}
return actualLength;
}
return uncompressedLength;
}

std::optional<uint64_t> Codec::doGetUncompressedLength(
uint64_t inputLength,
const uint8_t* input,
std::optional<uint64_t> uncompressedLength) const {
return uncompressedLength;
}

uint64_t Codec::compressPartial(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) {
VELOX_UNSUPPORTED("'{}' doesn't support partial compression", name());
}
} // namespace facebook::velox::common
Loading

0 comments on commit 26dbdea

Please sign in to comment.