Skip to content

Commit

Permalink
define async decompressor
Browse files Browse the repository at this point in the history
  • Loading branch information
yaqi-zhao committed Nov 27, 2023
1 parent 163e9f2 commit 281e582
Show file tree
Hide file tree
Showing 15 changed files with 963 additions and 544 deletions.
11 changes: 5 additions & 6 deletions velox/dwio/common/QplJobPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,12 @@

#include "velox/dwio/common/QplJobPool.h"
#include <folly/Random.h>
#include <iostream>
#include "velox/common/base/Exceptions.h"

namespace facebook::velox::dwio::common {

// std::array<qpl_job*, QplJobHWPool::MAX_JOB_NUMBER>
// QplJobHWPool::hwJobPtrPool;
std::array<std::atomic<bool>, QplJobHWPool::MAX_JOB_NUMBER>
QplJobHWPool::hwJobPtrLocks;
// bool QplJobHWPool::iaa_job_ready = false;
// std::unique_ptr<uint8_t[]> QplJobHWPool::hwJobsBuffer;

QplJobHWPool& QplJobHWPool::getInstance() {
static QplJobHWPool pool;
Expand All @@ -49,6 +44,9 @@ QplJobHWPool::~QplJobHWPool() {
iaaJobReady = false;
}

/**
* Allocate qpl job and put it into hwJobPtrPool
*/
void QplJobHWPool::allocateQPLJob() {
uint32_t job_size = 0;

Expand All @@ -60,6 +58,7 @@ void QplJobHWPool::allocateQPLJob() {

// Initialize pool for storing all job object pointers
// Allocate buffer by shifting address offset for each job object.
hwJobPtrPool.resize(MAX_JOB_NUMBER);
for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) {
qpl_job* qplJobPtr =
reinterpret_cast<qpl_job*>(hwJobsBuffer.get() + i * job_size);
Expand All @@ -84,7 +83,7 @@ void QplJobHWPool::allocateQPLJob() {
* QplJobHWPool maintains MAX_JOB_NUMBER job slot to avoid frequently allocate,
* initialize and release job. Random slots is used to select a job and
* tryLockJob will check if the job is free.
* @return job_id and qpl_job pointer
* @return job_id and qpl_job pair
*/
std::pair<int, qpl_job*> QplJobHWPool::acquireDeflateJob() {
std::pair<int, qpl_job*> res;
Expand Down
10 changes: 8 additions & 2 deletions velox/dwio/common/QplJobPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace facebook::velox::dwio::common {
// together, so that each analytics operation can perform decompress-only,
// filter-only, or decompress-and-filter processing.
//
// Intel QPLis library to provide application programming interface (API) for
// Intel QPL is library to provide application programming interface (API) for
// interaction with Intel® In-Memory Analytics Accelerator (Intel® IAA) hardware
//
// Intel® IAA:
Expand All @@ -61,6 +61,12 @@ class QplJobHWPool {
}

std::pair<int, qpl_job*> acquireDeflateJob();

/**
* Get qpl job by job id
* @param job_id the job id or index in the qpl job pool
* @return nullptr if the job id is invalid
*/
qpl_job* getJobById(int job_id) {
if (job_id >= MAX_JOB_NUMBER || job_id <= 0) {
return nullptr;
Expand All @@ -81,7 +87,7 @@ class QplJobHWPool {
std::unique_ptr<uint8_t[]> hwJobsBuffer;

// Job pool for storing all job object pointers
std::array<qpl_job*, MAX_JOB_NUMBER> hwJobPtrPool;
std::vector<qpl_job*> hwJobPtrPool;

// Locks for accessing each job object pointers
bool iaaJobReady;
Expand Down
98 changes: 98 additions & 0 deletions velox/dwio/common/compression/AsyncCompression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ASYNC_COMPRESSION_H_
#define ASYNC_COMPRESSION_H_

#include <folly/futures/Future.h>
#include "velox/common/compression/Compression.h"

namespace facebook::velox::dwio::common::compression {

using facebook::velox::common::CompressionKind;

class AsyncDecompressor {
public:
explicit AsyncDecompressor(){};

virtual ~AsyncDecompressor() = default;

virtual folly::SemiFuture<uint64_t> decompressAsync(
const char* src,
uint64_t srcLength,
char* dest,
uint64_t destLength) = 0;
};

std::unique_ptr<AsyncDecompressor> MakeIAAGzipCodec();

/**
* Get the window size from zlib header(rfc1950).
* 0 1
* +---+---+
* |CMF|FLG| (more-->)
* +---+---+
* bits 0 to 3 CM Compression method
* bits 4 to 7 CINFO Compression info
* CM (Compression method) This identifies the compression method used in the
* file. CM = 8 denotes the "deflate" compression method with a window size up
* to 32K. CINFO (Compression info) For CM = 8, CINFO is the base-2 logarithm of
* the LZ77 window size, minus eight (CINFO=7 indicates a 32K window size).
* @param stream_ptr the compressed block length for raw decompression
* @param stream_size compression options to use
*/
static int getZlibWindowBits(const uint8_t* stream_ptr, uint32_t stream_size) {
static constexpr uint8_t CM_ZLIB_DEFAULT_VALUE = 8u;
static constexpr uint32_t ZLIB_MIN_HEADER_SIZE = 2u;
static constexpr uint32_t ZLIB_INFO_OFFSET = 4u;
if (stream_size < ZLIB_MIN_HEADER_SIZE) {
return -1;
}
const uint8_t compression_method_and_flag = *stream_ptr++;
const uint8_t compression_method = compression_method_and_flag & 0xf;
const uint8_t compression_info =
compression_method_and_flag >> ZLIB_INFO_OFFSET;

if (CM_ZLIB_DEFAULT_VALUE != compression_method) {
return -1;
}
if (compression_info > 7) {
return -1;
}
return CM_ZLIB_DEFAULT_VALUE + compression_info;
}

/**
* Create a decompressor for the given compression kind in asynchronous mode.
* @param kind the compression type to implement
*/
static std::unique_ptr<dwio::common::compression::AsyncDecompressor>
createAsyncDecompressor(facebook::velox::common::CompressionKind kind) {
switch (static_cast<int64_t>(kind)) {
#ifdef VELOX_ENABLE_INTEL_IAA
case CompressionKind::CompressionKind_GZIP:
return MakeIAAGzipCodec();
#endif
default:
LOG(WARNING) << "Asynchronous mode not support for compression codec "
<< kind;
return nullptr;
}
return nullptr;
}
} // namespace facebook::velox::dwio::common::compression

#endif
8 changes: 8 additions & 0 deletions velox/dwio/common/compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,11 @@ add_library(velox_dwio_common_compression Compression.cpp PagedInputStream.cpp

target_link_libraries(velox_dwio_common_compression velox_dwio_common xsimd
gtest Folly::folly)

if(VELOX_ENABLE_INTEL_IAA)
add_library(velox_dwio_common_iaa_compression IAACompression.cpp)
target_link_libraries(velox_dwio_common_iaa_compression velox_dwio_qpl
Folly::folly xsimd)
target_link_libraries(velox_dwio_common_compression
velox_dwio_common_iaa_compression)
endif()
124 changes: 0 additions & 124 deletions velox/dwio/common/compression/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,113 +445,6 @@ std::pair<int64_t, bool> ZstdDecompressor::getDecompressedLength(
return {uncompressedLength, true};
}

class GzipIAADecompressor : public AsyncDecompressor {
public:
explicit GzipIAADecompressor() {}

explicit GzipIAADecompressor(
uint64_t blockSize,
const std::string& streamDebugInfo)
: AsyncDecompressor{blockSize, streamDebugInfo} {}

int decompress(
const char* src,
uint64_t srcLength,
char* dest,
uint64_t destLength) override;

bool waitResult(int job_id) override;

void releaseJob(int job_id) override;
};

int GzipIAADecompressor::decompress(
const char* src,
uint64_t srcLength,
char* dest,
uint64_t destLength) {
#ifdef VELOX_ENABLE_INTEL_IAA
dwio::common::QplJobHWPool& qpl_job_pool =
dwio::common::QplJobHWPool::getInstance();
// int job_id = 0;
auto deflate_job = qpl_job_pool.acquireDeflateJob();
// qpl_job* job = qpl_job_pool.AcquireDeflateJob(job_id);
auto job = deflate_job.second;
if (job == nullptr) {
LOG(WARNING) << "cannot AcquireDeflateJob ";
return -1; // Invalid job id to illustrate the
// failed decompress job.
}
job->op = qpl_op_decompress;
job->next_in_ptr = reinterpret_cast<uint8_t*>(const_cast<char*>(src));
job->next_out_ptr = reinterpret_cast<uint8_t*>(dest);
job->available_in = static_cast<uint32_t>(srcLength);
job->available_out = static_cast<uint32_t>(destLength);
job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE;

qpl_status status = qpl_submit_job(job);
if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
qpl_job_pool.releaseJob(deflate_job.first);
deflate_job = qpl_job_pool.acquireDeflateJob();
job = deflate_job.second;
if (job == nullptr) {
LOG(WARNING)
<< "cannot acqure deflate job after QPL_STS_QUEUES_ARE_BUSY_ERR ";
return -1; // Invalid job id to illustrate the
// failed decompress job.
}
job->op = qpl_op_decompress;
job->next_in_ptr = reinterpret_cast<uint8_t*>(const_cast<char*>(src));
job->next_out_ptr = reinterpret_cast<uint8_t*>(dest);
job->available_in = static_cast<uint32_t>(srcLength);
job->available_out = static_cast<uint32_t>(destLength);
job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE;

status = qpl_submit_job(job);
}
if (status != QPL_STS_OK) {
qpl_job_pool.releaseJob(deflate_job.first);
LOG(WARNING) << "cannot submit job, error status: " << status;
return -1; // Invalid job id to illustrate the
// failed decompress job.
} else {
return deflate_job.first;
}
#else
return -1;
#endif
}

bool GzipIAADecompressor::waitResult(int job_id) {
#ifdef VELOX_ENABLE_INTEL_IAA
dwio::common::QplJobHWPool& qpl_job_pool =
dwio::common::QplJobHWPool::getInstance();
if (job_id <= 0 || job_id >= qpl_job_pool.MAX_JOB_NUMBER) {
return true;
}
qpl_job* job = qpl_job_pool.getJobById(job_id);

auto status = qpl_wait_job(job);
qpl_job_pool.releaseJob(job_id);
if (status == QPL_STS_OK) {
return true;
}
LOG(WARNING) << "Decompress w/IAA error, status: " << status;
#endif
return false;
}

void GzipIAADecompressor::releaseJob(int job_id) {
#ifdef VELOX_ENABLE_INTEL_IAA
dwio::common::QplJobHWPool& qpl_job_pool =
dwio::common::QplJobHWPool::getInstance();
if (job_id <= 0 || job_id >= qpl_job_pool.MAX_JOB_NUMBER) {
return;
}
return qpl_job_pool.releaseJob(job_id);
#endif
}

class SnappyDecompressor : public Decompressor {
public:
explicit SnappyDecompressor(
Expand Down Expand Up @@ -833,21 +726,4 @@ std::unique_ptr<dwio::common::SeekableInputStream> createDecompressor(
compressedLength);
}

std::unique_ptr<dwio::common::compression::AsyncDecompressor>
createAsyncDecompressor(
facebook::velox::common::CompressionKind kind,
uint64_t bufferSize,
const std::string& streamDebugInfo) {
std::unique_ptr<AsyncDecompressor> decompressor;
switch (static_cast<int64_t>(kind)) {
case CompressionKind::CompressionKind_GZIP:
return std::make_unique<GzipIAADecompressor>(bufferSize, streamDebugInfo);
default:
LOG(WARNING) << "Asynchronous mode not support for compression codec "
<< kind;
return nullptr;
}
return nullptr;
}

} // namespace facebook::velox::dwio::common::compression
Loading

0 comments on commit 281e582

Please sign in to comment.