Skip to content

Commit

Permalink
pre-decompress gzip w/IAA
Browse files Browse the repository at this point in the history
  • Loading branch information
yaqi-zhao committed Sep 27, 2023
1 parent f9e85f0 commit 5b84c3f
Show file tree
Hide file tree
Showing 16 changed files with 830 additions and 82 deletions.
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ option(
VELOX_ENABLE_INT64_BUILD_PARTITION_BOUND
"make buildPartitionBounds_ a vector int64 instead of int32 to avoid integer overflow when the hashtable has billions of records"
OFF)
option(VELOX_ENABLE_QPL "Enable Intel QPL support" OFF)

if(${VELOX_BUILD_MINIMAL})
# Enable and disable components for velox base build
Expand Down Expand Up @@ -235,6 +236,11 @@ if(VELOX_ENABLE_REMOTE_FUNCTIONS)
find_package(FBThrift CONFIG REQUIRED)
endif()

if(VELOX_ENABLE_QPL)
add_definitions(-DVELOX_ENABLE_QPL)
message(STATUS "add VELOX_ENABLE_QPL")
endif()

# Turn on Codegen only for Clang and non Mac systems.
if((NOT DEFINED VELOX_CODEGEN_SUPPORT)
AND (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
Expand Down
51 changes: 51 additions & 0 deletions third_party/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,54 @@ if(VELOX_ENABLE_ARROW)
${ARROW_LIBDIR}/libparquet.a)

endif()

if(VELOX_ENABLE_QPL)
message(STATUS "Building QPL from source")
set(QPL_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qpl_ep/install")
set(QPL_STATIC_LIB_NAME
${CMAKE_STATIC_LIBRARY_PREFIX}qpl${CMAKE_STATIC_LIBRARY_SUFFIX})
set(QPL_STATIC_LIB "${QPL_PREFIX}/lib/${QPL_STATIC_LIB_NAME}")

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -ldl -laccel-config -L/usr/lib64")

set(QPL_CMAKE_ARGS
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DCMAKE_INSTALL_LIBDIR=${QPL_PREFIX}/lib
-DCMAKE_INSTALL_PREFIX=${QPL_PREFIX}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DEFFICIENT_WAIT=OFF
-DQPL_BUILD_TESTS=OFF
-DCMAKE_C_COMPILER=gcc
-DCMAKE_CXX_COMPILER=g++
-DQPL_BUILD_EXAMPLES=OFF
-DQPL_LIB=ON)

# todo: qpl newest version will released at Sep, to be updated
# ExternalProject_Add(
# qpl_ep
# ${EP_LOG_OPTIONS}
# URL https://github.com/intel/qpl/archive/refs/tags/v1.2.0.tar.gz
# BUILD_BYPRODUCTS "${QPL_STATIC_LIB}"
# CMAKE_ARGS ${QPL_CMAKE_ARGS})
ExternalProject_Add(qpl_ep
${EP_LOG_OPTIONS}
GIT_REPOSITORY https://github.com/intel-innersource/libraries.performance.accelerators.qpl.qpl-library.git
GIT_TAG origin/develop
GIT_CONFIG https.sslVerify=false
BUILD_BYPRODUCTS "${QPL_STATIC_LIB}"
CMAKE_ARGS ${QPL_CMAKE_ARGS}
)

file(MAKE_DIRECTORY "${QPL_PREFIX}/include")

add_library(qpl::qpl STATIC IMPORTED GLOBAL)
set(QPL_LIBRARIES ${QPL_STATIC_LIB})
set(QPL_INCLUDE_DIRS "${QPL_PREFIX}/include")
target_link_libraries(qpl::qpl INTERFACE /usr/lib64/libaccel-config.so)
set_target_properties(
qpl::qpl PROPERTIES IMPORTED_LOCATION ${QPL_LIBRARIES}
INTERFACE_INCLUDE_DIRECTORIES ${QPL_INCLUDE_DIRS})

add_dependencies(qpl::qpl qpl_ep)

endif()
9 changes: 9 additions & 0 deletions velox/dwio/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ add_library(

target_include_directories(velox_dwio_common PRIVATE ${Protobuf_INCLUDE_DIRS})

message(STATUS "QPL_STATIC_LINK_LIBS: ${QPL_STATIC_LINK_LIBS}")

target_link_libraries(
velox_dwio_common
velox_buffer
Expand All @@ -73,4 +75,11 @@ target_link_libraries(
velox_exec
Boost::regex
Folly::folly
${QPL_STATIC_LINK_LIBS}
glog::glog)

if(VELOX_ENABLE_QPL)
add_library(velox_dwio_qpl QplJobPool.cpp)
target_link_libraries(velox_dwio_qpl qpl::qpl Folly::folly)
target_link_libraries(velox_dwio_common velox_dwio_qpl)
endif()
117 changes: 117 additions & 0 deletions velox/dwio/common/QplJobPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/dwio/common/QplJobPool.h"
#include <folly/Random.h>
#include <iostream>
#include "velox/common/base/Exceptions.h"

namespace facebook::velox::dwio::common {

std::array<qpl_job*, QplJobHWPool::MAX_JOB_NUMBER>
QplJobHWPool::hw_job_ptr_pool;
std::array<std::atomic<bool>, QplJobHWPool::MAX_JOB_NUMBER>
QplJobHWPool::hw_job_ptr_locks;
bool QplJobHWPool::iaa_job_ready = false;
std::unique_ptr<uint8_t[]> QplJobHWPool::hw_jobs_buffer;

QplJobHWPool& QplJobHWPool::GetInstance() {
static QplJobHWPool pool;
return pool;
}

QplJobHWPool::QplJobHWPool() {
if (!iaa_job_ready) {
(void)AllocateQPLJob();
}
}

QplJobHWPool::~QplJobHWPool() {
for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) {
if (hw_job_ptr_pool[i]) {
qpl_fini_job(hw_job_ptr_pool[i]);
hw_job_ptr_pool[i] = nullptr;
}
}
iaa_job_ready = false;
}

bool QplJobHWPool::AllocateQPLJob() {
uint32_t job_size = 0;

// Get size required for saving a single qpl job object
qpl_get_job_size(qpl_path, &job_size);
// Allocate entire buffer for storing all job objects
hw_jobs_buffer = std::make_unique<uint8_t[]>(job_size * MAX_JOB_NUMBER);
// Initialize pool for storing all job object pointers
// Allocate buffer by shifting address offset for each job object.
for (uint32_t index = 0; index < MAX_JOB_NUMBER; ++index) {
qpl_job* qpl_job_ptr =
reinterpret_cast<qpl_job*>(hw_jobs_buffer.get() + index * job_size);
auto status = qpl_init_job(qpl_path, qpl_job_ptr);
if (status != QPL_STS_OK) {
iaa_job_ready = false;
LOG(WARNING) << "Initialization of hardware IAA failed, statsu: "
<< status << ". Please check if Intel \
In-Memory Analytics Accelerator (IAA) is properly set up!";
return false;
}
this->hw_job_ptr_pool[index] = qpl_job_ptr;
hw_job_ptr_locks[index].store(false);
}

iaa_job_ready = true;
return true;
}

qpl_job* QplJobHWPool::AcquireDeflateJob(int& job_id) {
job_id = -1;
if (!job_ready()) {
return nullptr;
}
uint32_t retry = 0;
auto index = folly::Random::rand32(1, MAX_JOB_NUMBER - 1);
while (!tryLockJob(index)) {
index = folly::Random::rand32(1, MAX_JOB_NUMBER - 1);
retry++;
if (retry > MAX_JOB_NUMBER) {
return nullptr;
}
}
job_id = index;
if (index >= MAX_JOB_NUMBER) {
return nullptr;
}

return hw_job_ptr_pool[index];
}

void QplJobHWPool::ReleaseJob(int job_id) {
if (job_id >= MAX_JOB_NUMBER || job_id <= 0) {
return;
}
assert(job_id < MAX_JOB_NUMBER);
hw_job_ptr_locks[job_id].store(false);
return;
}

bool QplJobHWPool::tryLockJob(uint32_t index) {
bool expected = false;
assert(index < MAX_JOB_NUMBER);
return hw_job_ptr_locks[index].compare_exchange_strong(expected, true);
}

} // namespace facebook::velox::dwio::common
73 changes: 73 additions & 0 deletions velox/dwio/common/QplJobPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <mutex>
#include <random>
#include <vector>

#include "qpl/qpl.h"

namespace facebook::velox::dwio::common {

// QplJobHWPool is resource pool to provide the job objects, which is
// used for storing context information.
// Memory for QPL job will be allocated when the QPLJobHWPool instance is
// created
class QplJobHWPool {
public:
static QplJobHWPool& GetInstance();
QplJobHWPool();
~QplJobHWPool();

// Release QPL job by the job_id.
void ReleaseJob(int job_id);

// Return if the QPL job is allocated sucessfully.
const bool& job_ready() {
return iaa_job_ready;
}

qpl_job* AcquireDeflateJob(int& job_id);
qpl_job* GetJobById(int job_id) {
if (job_id >= MAX_JOB_NUMBER || job_id <= 0) {
return nullptr;
}
return hw_job_ptr_pool[job_id];
}

static constexpr auto MAX_JOB_NUMBER = 1024;

private:
bool tryLockJob(uint32_t index);
bool AllocateQPLJob();

static constexpr qpl_path_t qpl_path = qpl_path_hardware;
// Max jobs in QPL_JOB_POOL
// Entire buffer for storing all job objects
static std::unique_ptr<uint8_t[]> hw_jobs_buffer;

// Job pool for storing all job object pointers
static std::array<qpl_job*, MAX_JOB_NUMBER> hw_job_ptr_pool;

// Locks for accessing each job object pointers
static bool iaa_job_ready;
static std::array<std::atomic<bool>, MAX_JOB_NUMBER> hw_job_ptr_locks;
};

} // namespace facebook::velox::dwio::common
Loading

0 comments on commit 5b84c3f

Please sign in to comment.