Skip to content

Commit

Permalink
rebase on #5914
Browse files Browse the repository at this point in the history
  • Loading branch information
yaqi-zhao committed Aug 24, 2023
1 parent f90584d commit 6ff1026
Show file tree
Hide file tree
Showing 15 changed files with 472 additions and 2,207 deletions.
1 change: 0 additions & 1 deletion velox/dwio/common/BitConcatenation.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

#include "velox/buffer/Buffer.h"
#pragma once

namespace facebook::velox::dwio::common {

Expand Down
14 changes: 6 additions & 8 deletions velox/dwio/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ add_library(
MetadataFilter.cpp
Options.cpp
OutputStream.cpp
QplJobPool.cpp
Range.cpp
Reader.cpp
ReaderFactory.cpp
Expand All @@ -57,12 +56,6 @@ add_library(
WriterFactory.cpp)

target_include_directories(velox_dwio_common PRIVATE ${Protobuf_INCLUDE_DIRS})
set(QPL_STATIC_LINK_LIBS ${QPL_STATIC_LINK_LIBS})
if(VELOX_ENABLE_QPL)
list(APPEND QPL_STATIC_LINK_LIBS qpl::qpl)
endif()

message(STATUS "QPL_STATIC_LINK_LIBS: ${QPL_STATIC_LINK_LIBS}")

target_link_libraries(
velox_dwio_common
Expand All @@ -77,5 +70,10 @@ target_link_libraries(
velox_memory
Boost::regex
Folly::folly
${QPL_STATIC_LINK_LIBS}
glog::glog)

if(VELOX_ENABLE_QPL)
add_library(velox_dwio_qpl QplJobPool.cpp)
target_link_libraries(velox_dwio_qpl qpl::qpl Folly::folly)
target_link_libraries(velox_dwio_common velox_dwio_qpl)
endif()
20 changes: 8 additions & 12 deletions velox/dwio/common/QplJobPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
#include <iostream>
#include "velox/common/base/Exceptions.h"

#ifdef VELOX_ENABLE_QPL

namespace facebook::velox::dwio::common {

std::array<qpl_job*, QplJobHWPool::MAX_JOB_NUMBER>
Expand All @@ -30,8 +28,6 @@ std::array<std::atomic<bool>, QplJobHWPool::MAX_JOB_NUMBER>
bool QplJobHWPool::iaa_job_ready = false;
std::unique_ptr<uint8_t[]> QplJobHWPool::hw_jobs_buffer;

// static QplJobHWPool pool = QplJobHWPool::GetInstance();

QplJobHWPool& QplJobHWPool::GetInstance() {
static QplJobHWPool pool;
return pool;
Expand All @@ -56,12 +52,12 @@ QplJobHWPool::~QplJobHWPool() {
bool QplJobHWPool::AllocateQPLJob() {
uint32_t job_size = 0;

/// Get size required for saving a single qpl job object
// Get size required for saving a single qpl job object
qpl_get_job_size(qpl_path, &job_size);
/// Allocate entire buffer for storing all job objects
// Allocate entire buffer for storing all job objects
hw_jobs_buffer = std::make_unique<uint8_t[]>(job_size * MAX_JOB_NUMBER);
/// Initialize pool for storing all job object pointers
/// Reallocate buffer by shifting address offset for each job object.
// Initialize pool for storing all job object pointers
// Allocate buffer by shifting address offset for each job object.
for (uint32_t index = 0; index < MAX_JOB_NUMBER; ++index) {
qpl_job* qpl_job_ptr =
reinterpret_cast<qpl_job*>(hw_jobs_buffer.get() + index * job_size);
Expand All @@ -81,7 +77,8 @@ bool QplJobHWPool::AllocateQPLJob() {
return true;
}

qpl_job* QplJobHWPool::AcquireDeflateJob(uint32_t& job_id) {
qpl_job* QplJobHWPool::AcquireDeflateJob(int& job_id) {
job_id = -1;
if (!job_ready()) {
return nullptr;
}
Expand All @@ -102,8 +99,8 @@ qpl_job* QplJobHWPool::AcquireDeflateJob(uint32_t& job_id) {
return hw_job_ptr_pool[index];
}

void QplJobHWPool::ReleaseJob(uint32_t job_id) {
if (job_id >= MAX_JOB_NUMBER) {
void QplJobHWPool::ReleaseJob(int job_id) {
if (job_id >= MAX_JOB_NUMBER || job_id <= 0) {
return;
}
assert(job_id < MAX_JOB_NUMBER);
Expand All @@ -118,4 +115,3 @@ bool QplJobHWPool::tryLockJob(uint32_t index) {
}

} // namespace facebook::velox::dwio::common
#endif
46 changes: 19 additions & 27 deletions velox/dwio/common/QplJobPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,60 +20,52 @@
#include <mutex>
#include <random>
#include <vector>

#ifdef VELOX_ENABLE_QPL
#include "qpl/qpl.h"

namespace facebook::velox::dwio::common {

/// QplJobHWPool is resource pool to provide the job objects, which is
/// used for storing context information during.
/// Memory for QPL job will be allocated when the QPLJobHWPool instance is
/// created
///
// QPL job can offload RLE-decoding/Filter/(De)compression works to hardware
// accelerator.
// QplJobHWPool is resource pool to provide the job objects, which is
// used for storing context information.
// Memory for QPL job will be allocated when the QPLJobHWPool instance is
// created
class QplJobHWPool {
public:
static QplJobHWPool& GetInstance();
QplJobHWPool();
~QplJobHWPool();
/// Acquire QPL job
///
/// @param job_id QPL job id, used when release QPL job
/// \return Pointer to the QPL job. If acquire job failed, return nullptr.
qpl_job* AcquireDeflateJob(uint32_t& job_id);

/// \brief Release QPL job by the job_id.
void ReleaseJob(uint32_t job_id);
// Release QPL job by the job_id.
void ReleaseJob(int job_id);

/// \brief Return if the QPL job is allocated sucessfully.
// Return if the QPL job is allocated sucessfully.
const bool& job_ready() {
return iaa_job_ready;
}

qpl_job* GetJobById(uint32_t job_id) {
qpl_job* AcquireDeflateJob(int& job_id);
qpl_job* GetJobById(int job_id) {
if (job_id >= MAX_JOB_NUMBER || job_id <= 0) {
return nullptr;
}
return hw_job_ptr_pool[job_id];
}

static constexpr qpl_path_t qpl_path = qpl_path_hardware;

// Max jobs in QPL_JOB_POOL
static constexpr auto MAX_JOB_NUMBER = 1024;

private:
bool tryLockJob(uint32_t index);
bool AllocateQPLJob();
static constexpr qpl_path_t qpl_path = qpl_path_hardware;
// Job pool for storing all job object pointers
static std::array<qpl_job*, MAX_JOB_NUMBER> hw_job_ptr_pool;

/// Max jobs in QPL_JOB_POOL
/// Entire buffer for storing all job objects
// Entire buffer for storing all job objects
static std::unique_ptr<uint8_t[]> hw_jobs_buffer;
/// Job pool for storing all job object pointers
static std::array<qpl_job*, MAX_JOB_NUMBER> hw_job_ptr_pool;

/// Locks for accessing each job object pointers
static bool iaa_job_ready;
// Locks for accessing each job object pointers
static std::array<std::atomic<bool>, MAX_JOB_NUMBER> hw_job_ptr_locks;
static bool iaa_job_ready;
};

} // namespace facebook::velox::dwio::common
#endif
106 changes: 106 additions & 0 deletions velox/dwio/common/compression/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,97 @@ uint64_t ZlibDecompressor::decompress(
return destLength - zstream_.avail_out;
}

class GzipIAADecompressor : public AsyncDecompressor {
public:
explicit GzipIAADecompressor() {}

explicit GzipIAADecompressor(
uint64_t blockSize,
const std::string& streamDebugInfo)
: AsyncDecompressor{blockSize, streamDebugInfo} {}

int decompress(
const char* src,
uint64_t srcLength,
char* dest,
uint64_t destLength) override;

bool waitResult(int job_id) override;
};

int GzipIAADecompressor::decompress(
const char* src,
uint64_t srcLength,
char* dest,
uint64_t destLength) {
#ifdef VELOX_ENABLE_QPL
dwio::common::QplJobHWPool& qpl_job_pool =
dwio::common::QplJobHWPool::GetInstance();
int job_id = 0;
qpl_job* job = qpl_job_pool.AcquireDeflateJob(job_id);
if (job == nullptr) {
LOG(WARNING) << "cannot AcquireDeflateJob ";
return -1; // Invalid job id to illustrate the
// failed decompress job.
}
job->op = qpl_op_decompress;
job->next_in_ptr = reinterpret_cast<uint8_t*>(const_cast<char*>(src));
job->next_out_ptr = reinterpret_cast<uint8_t*>(dest);
job->available_in = static_cast<uint32_t>(srcLength);
job->available_out = static_cast<uint32_t>(destLength);
job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE;

qpl_status status = qpl_submit_job(job);
if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
qpl_job_pool.ReleaseJob(job_id);
job = qpl_job_pool.AcquireDeflateJob(job_id);
if (job == nullptr) {
LOG(WARNING)
<< "cannot acqure deflate job after QPL_STS_QUEUES_ARE_BUSY_ERR ";
return -1; // Invalid job id to illustrate the
// failed decompress job.
}
job->op = qpl_op_decompress;
job->next_in_ptr = reinterpret_cast<uint8_t*>(const_cast<char*>(src));
job->next_out_ptr = reinterpret_cast<uint8_t*>(dest);
job->available_in = static_cast<uint32_t>(srcLength);
job->available_out = static_cast<uint32_t>(destLength);
job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE;

status = qpl_submit_job(job);
}
if (status != QPL_STS_OK) {
qpl_job_pool.ReleaseJob(job_id);
LOG(WARNING) << "cannot submit job, error status: " << status;
return -1; // Invalid job id to illustrate the
// failed decompress job.
} else {
return job_id;
}
#else
return -1;
#endif
}

bool GzipIAADecompressor::waitResult(int job_id) {
#ifdef VELOX_ENABLE_QPL
dwio::common::QplJobHWPool& qpl_job_pool =
dwio::common::QplJobHWPool::GetInstance();
if (job_id <= 0 || job_id >= qpl_job_pool.MAX_JOB_NUMBER) {
return true;
}
qpl_job* job = qpl_job_pool.GetJobById(job_id);

auto status = qpl_wait_job(job);
qpl_job_pool.ReleaseJob(job_id);
if (status == QPL_STS_OK) {
return true;
}
LOG(WARNING) << "Decompress w/IAA error, status: " << status;
#endif
return false;
}

class LzoDecompressor : public Decompressor {
public:
explicit LzoDecompressor(
Expand Down Expand Up @@ -573,4 +664,19 @@ std::unique_ptr<dwio::common::SeekableInputStream> createDecompressor(
compressedLength);
}

std::unique_ptr<dwio::common::compression::AsyncDecompressor>
createAsyncDecompressor(
facebook::velox::common::CompressionKind kind,
uint64_t bufferSize,
const std::string& streamDebugInfo) {
std::unique_ptr<AsyncDecompressor> decompressor;
switch (static_cast<int64_t>(kind)) {
case CompressionKind::CompressionKind_GZIP:
return std::make_unique<GzipIAADecompressor>(bufferSize, streamDebugInfo);
default:
DWIO_RAISE("Asynchronous mode not support for compression codec ", kind);
}
return nullptr;
}

} // namespace facebook::velox::dwio::common::compression
Loading

0 comments on commit 6ff1026

Please sign in to comment.