Skip to content

Commit

Permalink
pre-decompress with Intel IAA
Browse files Browse the repository at this point in the history
  • Loading branch information
yaqi-zhao committed Nov 7, 2023
1 parent 33538dd commit 9172107
Show file tree
Hide file tree
Showing 16 changed files with 914 additions and 18 deletions.
59 changes: 59 additions & 0 deletions CMake/resolve_dependency_modules/inteliaa.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
include_guard(GLOBAL)

set(VELOX_INTELIAA_VERSION 1.3.0)
set(VELOX_INTELIAA_BUILD_SHA256_CHECKSUM
c3eba4d04a9d7aabcf26c9eaf81f6e9b26d19cb1b87a4a5f197a652cfa98f310)
set(VELOX_INTELIAA_SOURCE_URL
"https://github.com/intel/qpl/archive/refs/tags/v${VELOX_INTELIAA_VERSION}.tar.gz"
)

resolve_dependency_url(INTELIAA)

message(STATUS "Building Intel IAA from source")

set(QPL_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qpl_ep/install")
set(QPL_STATIC_LIB_NAME
${CMAKE_STATIC_LIBRARY_PREFIX}qpl${CMAKE_STATIC_LIBRARY_SUFFIX})
set(QPL_STATIC_LIB "${QPL_PREFIX}/lib/${QPL_STATIC_LIB_NAME}")

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -ldl -laccel-config -L/usr/lib64")

set(QPL_CMAKE_ARGS
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DCMAKE_INSTALL_LIBDIR=${QPL_PREFIX}/lib
-DCMAKE_INSTALL_PREFIX=${QPL_PREFIX}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DQPL_BUILD_TESTS=OFF
-DQPL_BUILD_EXAMPLES=OFF
-DQPL_LIB=ON)

ExternalProject_Add(
intel_iaa
URL ${VELOX_INTELIAA_SOURCE_URL}
URL_HASH ${VELOX_INTELIAA_BUILD_SHA256_CHECKSUM}
BUILD_BYPRODUCTS "${QPL_STATIC_LIB}"
CMAKE_ARGS ${QPL_CMAKE_ARGS})

file(MAKE_DIRECTORY "${QPL_PREFIX}/include")

add_library(iaa::iaa UNKNOWN IMPORTED)
set(QPL_LIBRARIES ${QPL_STATIC_LIB})
set(QPL_INCLUDE_DIRS "${QPL_PREFIX}/include")
set_target_properties(
iaa::iaa PROPERTIES IMPORTED_LOCATION ${QPL_LIBRARIES}
INTERFACE_INCLUDE_DIRECTORIES ${QPL_INCLUDE_DIRS})

add_dependencies(iaa::iaa intel_iaa-build)
11 changes: 11 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ option(
VELOX_ENABLE_INT64_BUILD_PARTITION_BOUND
"make buildPartitionBounds_ a vector int64 instead of int32 to avoid integer overflow when the hashtable has billions of records"
OFF)
option(VELOX_ENABLE_INTEL_IAA "Enable Intel IAA support" OFF)

# Explicitly force compilers to generate colored output. Compilers usually do
# this by default if they detect the output is a terminal, but this assumption
Expand Down Expand Up @@ -265,6 +266,11 @@ if(VELOX_ENABLE_REMOTE_FUNCTIONS)
find_package(FBThrift CONFIG REQUIRED)
endif()

if(VELOX_ENABLE_INTEL_IAA)
add_definitions(-DVELOX_ENABLE_INTEL_IAA)
message(STATUS "Intel IAA acceleration enabled")
endif()

# define processor variable for conditional compilation
if(${VELOX_CODEGEN_SUPPORT})
add_compile_definitions(CODEGEN_ENABLED=1)
Expand Down Expand Up @@ -470,6 +476,11 @@ if(NOT ${VELOX_BUILD_MINIMAL})
include_directories(${Protobuf_INCLUDE_DIRS})
endif()

if(VELOX_ENABLE_INTEL_IAA)
set_source(inteliaa)
resolve_dependency(inteliaa)
endif()

# GCC needs to link a library to enable std::filesystem.
if("${CMAKE_CXX_COMPILER_ID}" MATCHES "GNU")
set(FILESYSTEM "stdc++fs")
Expand Down
6 changes: 6 additions & 0 deletions velox/dwio/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,9 @@ target_link_libraries(
Boost::regex
Folly::folly
glog::glog)

if(VELOX_ENABLE_INTEL_IAA)
add_library(velox_dwio_qpl QplJobPool.cpp)
target_link_libraries(velox_dwio_qpl iaa::iaa Folly::folly)
target_link_libraries(velox_dwio_common velox_dwio_qpl)
endif()
129 changes: 129 additions & 0 deletions velox/dwio/common/QplJobPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/dwio/common/QplJobPool.h"
#include <folly/Random.h>
#include <iostream>
#include "velox/common/base/Exceptions.h"

namespace facebook::velox::dwio::common {

// std::array<qpl_job*, QplJobHWPool::MAX_JOB_NUMBER>
// QplJobHWPool::hwJobPtrPool;
std::array<std::atomic<bool>, QplJobHWPool::MAX_JOB_NUMBER>
QplJobHWPool::hwJobPtrLocks;
// bool QplJobHWPool::iaa_job_ready = false;
// std::unique_ptr<uint8_t[]> QplJobHWPool::hwJobsBuffer;

QplJobHWPool& QplJobHWPool::getInstance() {
static QplJobHWPool pool;
return pool;
}

QplJobHWPool::QplJobHWPool() {
if (!iaaJobReady) {
allocateQPLJob();
}
}

QplJobHWPool::~QplJobHWPool() {
for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) {
if (hwJobPtrPool[i]) {
qpl_fini_job(hwJobPtrPool[i]);
hwJobPtrPool[i] = nullptr;
}
}
iaaJobReady = false;
}

void QplJobHWPool::allocateQPLJob() {
uint32_t job_size = 0;

// Get size required for saving a single qpl job object
qpl_get_job_size(qpl_path, &job_size);

// Allocate entire buffer for storing all job objects
hwJobsBuffer = std::make_unique<uint8_t[]>(job_size * MAX_JOB_NUMBER);

// Initialize pool for storing all job object pointers
// Allocate buffer by shifting address offset for each job object.
for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) {
qpl_job* qplJobPtr =
reinterpret_cast<qpl_job*>(hwJobsBuffer.get() + i * job_size);
auto status = qpl_init_job(qpl_path, qplJobPtr);
if (status != QPL_STS_OK) {
iaaJobReady = false;
LOG(WARNING) << "Initialization of hardware IAA failed, statsu: "
<< status << ". Please check if Intel \
In-Memory Analytics Accelerator (IAA) is properly set up!";
return;
}
this->hwJobPtrPool[i] = qplJobPtr;
hwJobPtrLocks[i].store(false);
}

iaaJobReady = true;
return;
}

/**
* Acquire a deflate job.
* QplJobHWPool maintains MAX_JOB_NUMBER job slot to avoid frequently allocate,
* initialize and release job. Random slots is used to select a job and
* tryLockJob will check if the job is free.
* @return job_id and qpl_job pointer
*/
std::pair<int, qpl_job*> QplJobHWPool::acquireDeflateJob() {
std::pair<int, qpl_job*> res;
res.first = -1;
res.second = nullptr;
if (!job_ready()) {
return res;
}
uint32_t retry = 0;
uint32_t index = folly::Random::rand32(1, MAX_JOB_NUMBER - 1);
while (!tryLockJob(index)) {
index = folly::Random::rand32(1, MAX_JOB_NUMBER - 1);
retry++;
if (retry > MAX_JOB_NUMBER) {
return res;
}
}
res.first = index;
if (index >= MAX_JOB_NUMBER) {
return res;
}
res.second = hwJobPtrPool[index];

return res;
}

void QplJobHWPool::releaseJob(int job_id) {
if (job_id >= MAX_JOB_NUMBER || job_id <= 0) {
return;
}
VELOX_CHECK_LT(job_id, MAX_JOB_NUMBER);
hwJobPtrLocks[job_id].store(false);
return;
}

bool QplJobHWPool::tryLockJob(uint32_t index) {
bool expected = false;
VELOX_CHECK_LT(index, MAX_JOB_NUMBER);
return hwJobPtrLocks[index].compare_exchange_strong(expected, true);
}

} // namespace facebook::velox::dwio::common
91 changes: 91 additions & 0 deletions velox/dwio/common/QplJobPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <mutex>
#include <random>
#include <vector>

#include "qpl/qpl.h"

namespace facebook::velox::dwio::common {

// QplJobHWPool is resource pool to provide the job that will be submitted to
// Intel® IAA Memory for Intel® IAA job will be allocated when the QPLJobHWPool
// instance is created.
//
// The Intel® In-Memory Analytics Accelerator (Intel® IAA) is a hardware
// accelerator that provides very high throughput compression and decompression
// combined with primitive analytic functions. It primarily targets applications
// such as big-data and in-memory analytic databases, as well as
// application-transparent usages such as memory page compression. Intel® IAA
// contains two main functional blocks: Compression and Analytics. The Analytics
// pipe contains two sub-blocks: Decompress and Filter. These functions are tied
// together, so that each analytics operation can perform decompress-only,
// filter-only, or decompress-and-filter processing.
//
// Intel QPLis library to provide application programming interface (API) for
// interaction with Intel® In-Memory Analytics Accelerator (Intel® IAA) hardware
//
// Intel® IAA:
// https://www.intel.com/content/www/us/en/content-details/780887/intel-in-memory-analytics-accelerator-intel-iaa.html
// Intel QPL:
// https://intel.github.io/qpl/documentation/introduction_docs/introduction.html
class QplJobHWPool {
public:
static QplJobHWPool& getInstance();
QplJobHWPool();
~QplJobHWPool();

// Release QPL job by the job_id.
void releaseJob(int job_id);

// Return if the QPL job is allocated sucessfully.
const bool& job_ready() {
return iaaJobReady;
}

std::pair<int, qpl_job*> acquireDeflateJob();
qpl_job* getJobById(int job_id) {
if (job_id >= MAX_JOB_NUMBER || job_id <= 0) {
return nullptr;
}
return hwJobPtrPool[job_id];
}

static constexpr uint32_t MAX_JOB_NUMBER = 1024;

private:
bool tryLockJob(uint32_t index);
void allocateQPLJob();

qpl_path_t qpl_path = qpl_path_hardware;

// Max jobs in QPL_JOB_POOL
// Entire buffer for storing all job objects
std::unique_ptr<uint8_t[]> hwJobsBuffer;

// Job pool for storing all job object pointers
std::array<qpl_job*, MAX_JOB_NUMBER> hwJobPtrPool;

// Locks for accessing each job object pointers
bool iaaJobReady;
static std::array<std::atomic<bool>, MAX_JOB_NUMBER> hwJobPtrLocks;
};

} // namespace facebook::velox::dwio::common
Loading

0 comments on commit 9172107

Please sign in to comment.