Skip to content

Commit

Permalink
pre-decompress gzip w/IAA
Browse files Browse the repository at this point in the history
  • Loading branch information
yaqi-zhao committed Aug 21, 2023
1 parent 24a66e7 commit 325e9c1
Show file tree
Hide file tree
Showing 17 changed files with 2,460 additions and 6 deletions.
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ option(
VELOX_ENABLE_INT64_BUILD_PARTITION_BOUND
"make buildPartitionBounds_ a vector int64 instead of int32 to avoid integer overflow when the hashtable has billions of records"
OFF)
option(VELOX_ENABLE_QPL "Enable Intel QPL support" OFF)

if(${VELOX_BUILD_MINIMAL})
# Enable and disable components for velox base build
Expand Down Expand Up @@ -221,6 +222,11 @@ if(VELOX_ENABLE_REMOTE_FUNCTIONS)
find_package(FBThrift CONFIG REQUIRED)
endif()

if(VELOX_ENABLE_QPL)
add_definitions(-DVELOX_ENABLE_QPL)
message(STATUS "add VELOX_ENABLE_QPL")
endif()

# Turn on Codegen only for Clang and non Mac systems.
if((NOT DEFINED VELOX_CODEGEN_SUPPORT)
AND (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
Expand Down
41 changes: 41 additions & 0 deletions third_party/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,44 @@ if(VELOX_ENABLE_ARROW)
${ARROW_LIBDIR}/libparquet.a)

endif()

if(VELOX_ENABLE_QPL)
message(STATUS "Building QPL from source")
set(QPL_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qpl_ep/install")
set(QPL_STATIC_LIB_NAME ${CMAKE_STATIC_LIBRARY_PREFIX}qpl${CMAKE_STATIC_LIBRARY_SUFFIX})
set(QPL_STATIC_LIB "${QPL_PREFIX}/lib/${QPL_STATIC_LIB_NAME}")

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -ldl -laccel-config -L/usr/lib64")

set(QPL_CMAKE_ARGS
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DCMAKE_INSTALL_LIBDIR=${QPL_PREFIX}/lib
-DCMAKE_INSTALL_PREFIX=${QPL_PREFIX}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DEFFICIENT_WAIT=OFF
-DQPL_BUILD_TESTS=OFF
-DCMAKE_C_COMPILER=gcc
-DCMAKE_CXX_COMPILER=g++
-DQPL_LIB=ON)

# todo: qpl newest version will released at Sep, to be updated
ExternalProject_Add(qpl_ep
${EP_LOG_OPTIONS}
URL https://github.com/intel/qpl/archive/refs/tags/v1.2.0.tar.gz
BUILD_BYPRODUCTS "${QPL_STATIC_LIB}"
CMAKE_ARGS ${QPL_CMAKE_ARGS}
)

file(MAKE_DIRECTORY "${QPL_PREFIX}/include")

add_library(qpl::qpl STATIC IMPORTED GLOBAL)
set(QPL_LIBRARIES ${QPL_STATIC_LIB})
set(QPL_INCLUDE_DIRS "${QPL_PREFIX}/include")
target_link_libraries(qpl::qpl INTERFACE /usr/lib64/libaccel-config.so)
set_target_properties(qpl::qpl
PROPERTIES IMPORTED_LOCATION ${QPL_LIBRARIES}
INTERFACE_INCLUDE_DIRECTORIES ${QPL_INCLUDE_DIRS})

add_dependencies(qpl::qpl qpl_ep)

endif()
1 change: 1 addition & 0 deletions velox/dwio/common/BitConcatenation.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/buffer/Buffer.h"
#pragma once

namespace facebook::velox::dwio::common {

Expand Down
8 changes: 8 additions & 0 deletions velox/dwio/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ add_library(
MetadataFilter.cpp
Options.cpp
OutputStream.cpp
QplJobPool.cpp
Range.cpp
Reader.cpp
ReaderFactory.cpp
Expand All @@ -56,6 +57,12 @@ add_library(
WriterFactory.cpp)

target_include_directories(velox_dwio_common PRIVATE ${Protobuf_INCLUDE_DIRS})
set(QPL_STATIC_LINK_LIBS ${QPL_STATIC_LINK_LIBS})
if(VELOX_ENABLE_QPL)
list(APPEND QPL_STATIC_LINK_LIBS qpl::qpl)
endif()

message(STATUS "QPL_STATIC_LINK_LIBS: ${QPL_STATIC_LINK_LIBS}")

target_link_libraries(
velox_dwio_common
Expand All @@ -70,4 +77,5 @@ target_link_libraries(
velox_memory
Boost::regex
Folly::folly
${QPL_STATIC_LINK_LIBS}
glog::glog)
122 changes: 122 additions & 0 deletions velox/dwio/common/QplJobPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "velox/dwio/common/QplJobPool.h"
#include <folly/Random.h>
#include <iostream>
#include "velox/common/base/Exceptions.h"

#ifdef VELOX_ENABLE_QPL

namespace facebook::velox::dwio::common {

std::array<qpl_job*, QplJobHWPool::MAX_JOB_NUMBER>
QplJobHWPool::hw_job_ptr_pool;
std::array<std::atomic<bool>, QplJobHWPool::MAX_JOB_NUMBER>
QplJobHWPool::hw_job_ptr_locks;
bool QplJobHWPool::iaa_job_ready = false;
std::unique_ptr<uint8_t[]> QplJobHWPool::hw_jobs_buffer;

// static QplJobHWPool pool = QplJobHWPool::GetInstance();

QplJobHWPool& QplJobHWPool::GetInstance() {
static QplJobHWPool pool;
return pool;
}

QplJobHWPool::QplJobHWPool() {
if (!iaa_job_ready) {
(void)AllocateQPLJob();
}
}

QplJobHWPool::~QplJobHWPool() {
for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) {
if (hw_job_ptr_pool[i]) {
qpl_fini_job(hw_job_ptr_pool[i]);
hw_job_ptr_pool[i] = nullptr;
}
}
iaa_job_ready = false;
}

bool QplJobHWPool::AllocateQPLJob() {
uint32_t job_size = 0;

/// Get size required for saving a single qpl job object
qpl_get_job_size(qpl_path, &job_size);
/// Allocate entire buffer for storing all job objects
hw_jobs_buffer = std::make_unique<uint8_t[]>(job_size * MAX_JOB_NUMBER);
/// Initialize pool for storing all job object pointers
/// Reallocate buffer by shifting address offset for each job object.
for (uint32_t index = 0; index < MAX_JOB_NUMBER; ++index) {
qpl_job* qpl_job_ptr =
reinterpret_cast<qpl_job*>(hw_jobs_buffer.get() + index * job_size);
auto status = qpl_init_job(qpl_path, qpl_job_ptr);
if (status != QPL_STS_OK) {
iaa_job_ready = false;
LOG(WARNING) << "Initialization of hardware IAA failed, statsu: "
<< status << ". Please check if Intel \
In-Memory Analytics Accelerator (IAA) is properly set up!";
return false;
}
this->hw_job_ptr_pool[index] = qpl_job_ptr;
hw_job_ptr_locks[index].store(false);
}

iaa_job_ready = true;
return true;
}

qpl_job* QplJobHWPool::AcquireDeflateJob(uint32_t& job_id) {
if (!job_ready()) {
return nullptr;
}
uint32_t retry = 0;
auto index = folly::Random::rand32(1, MAX_JOB_NUMBER - 1);
while (!tryLockJob(index)) {
index = folly::Random::rand32(1, MAX_JOB_NUMBER - 1);
retry++;
if (retry > MAX_JOB_NUMBER) {
return nullptr;
}
}
job_id = index;
if (index >= MAX_JOB_NUMBER) {
return nullptr;
}

return hw_job_ptr_pool[index];
}

void QplJobHWPool::ReleaseJob(uint32_t job_id) {
if (job_id >= MAX_JOB_NUMBER) {
return;
}
assert(job_id < MAX_JOB_NUMBER);
hw_job_ptr_locks[job_id].store(false);
return;
}

bool QplJobHWPool::tryLockJob(uint32_t index) {
bool expected = false;
assert(index < MAX_JOB_NUMBER);
return hw_job_ptr_locks[index].compare_exchange_strong(expected, true);
}

} // namespace facebook::velox::dwio::common
#endif
80 changes: 80 additions & 0 deletions velox/dwio/common/QplJobPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <mutex>
#include <random>
#include <vector>

#ifdef VELOX_ENABLE_QPL
#include "qpl/qpl.h"

namespace facebook::velox::dwio::common {

/// QplJobHWPool is resource pool to provide the job objects, which is
/// used for storing context information during.
/// Memory for QPL job will be allocated when the QPLJobHWPool instance is
/// created
///
// QPL job can offload RLE-decoding/Filter/(De)compression works to hardware
// accelerator.
class QplJobHWPool {
public:
static QplJobHWPool& GetInstance();
QplJobHWPool();
~QplJobHWPool();
/// Acquire QPL job
///
/// @param job_id QPL job id, used when release QPL job
/// \return Pointer to the QPL job. If acquire job failed, return nullptr.
qpl_job* AcquireDeflateJob(uint32_t& job_id);

/// \brief Release QPL job by the job_id.
void ReleaseJob(uint32_t job_id);

/// \brief Return if the QPL job is allocated sucessfully.
const bool& job_ready() {
return iaa_job_ready;
}

qpl_job* GetJobById(uint32_t job_id) {
return hw_job_ptr_pool[job_id];
}

static constexpr qpl_path_t qpl_path = qpl_path_hardware;

static constexpr auto MAX_JOB_NUMBER = 1024;

private:
bool tryLockJob(uint32_t index);
bool AllocateQPLJob();

/// Max jobs in QPL_JOB_POOL
/// Entire buffer for storing all job objects
static std::unique_ptr<uint8_t[]> hw_jobs_buffer;
/// Job pool for storing all job object pointers
static std::array<qpl_job*, MAX_JOB_NUMBER> hw_job_ptr_pool;

/// Locks for accessing each job object pointers
static bool iaa_job_ready;
static std::array<std::atomic<bool>, MAX_JOB_NUMBER> hw_job_ptr_locks;
};

} // namespace facebook::velox::dwio::common
#endif
1 change: 1 addition & 0 deletions velox/dwio/parquet/reader/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ add_library(
PageReader.cpp
ParquetColumnReader.cpp
ParquetData.cpp
IAAPageReader.cpp
RepeatedColumnReader.cpp
RleBpDecoder.cpp
Statistics.cpp
Expand Down
Loading

0 comments on commit 325e9c1

Please sign in to comment.