Skip to content

Commit

Permalink
[GLUTEN-8397][CH][Part-1]: Disable hdfs while compiling clickhouse ba…
Browse files Browse the repository at this point in the history
…ckend on macOS (#8400)

* [CH]: Disable hdfs while compiling clickhouse backend on macOS

* add macos compile support without hdfs

* change unsafe cast to static_cast
  • Loading branch information
yxheartipp authored Jan 6, 2025
1 parent 9837875 commit e97a533
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 28 deletions.
30 changes: 17 additions & 13 deletions cpp-ch/local-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ endif()

set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -w -ffunction-sections -fdata-sections")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -w -ffunction-sections -fdata-sections")
set(CMAKE_SHARED_LINKER_FLAGS
"${CMAKE_EXE_LINKER_FLAGS} -Wl,--no-export-dynamic -Wl,--gc-sections")

if(APPLE)
add_definitions(-D_GNU_SOURCE)
else()
set(CMAKE_SHARED_LINKER_FLAGS
"${CMAKE_EXE_LINKER_FLAGS} -Wl,--no-export-dynamic -Wl,--gc-sections")
endif()
if(COMPILER_CLANG AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 16)
set(CMAKE_SHARED_LINKER_FLAGS
"${CMAKE_SHARED_LINKER_FLAGS} --ld-path=${LLD_WRAPPER}")
Expand Down Expand Up @@ -157,17 +160,18 @@ target_link_libraries(

target_link_libraries(${LOCALENGINE_SHARED_LIB} PUBLIC ch_parquet)

if(ENABLE_JEMALLOC)
target_link_options(
${LOCALENGINE_SHARED_LIB} PRIVATE
-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/libch.map
-Wl,-Bsymbolic-functions)
else()
target_link_options(
${LOCALENGINE_SHARED_LIB} PRIVATE
-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/libch-hide-jemalloc.map)
if(NOT APPLE)
if(ENABLE_JEMALLOC)
target_link_options(
${LOCALENGINE_SHARED_LIB} PRIVATE
-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/libch.map
-Wl,-Bsymbolic-functions)
else()
target_link_options(
${LOCALENGINE_SHARED_LIB} PRIVATE
-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/libch-hide-jemalloc.map)
endif()
endif()

if("${CMAKE_BUILD_TYPE}" MATCHES "Debug")
set(LOCALENGINE_SHARED_LIB_NAME "libchd.so")
else()
Expand Down
4 changes: 3 additions & 1 deletion cpp-ch/local-engine/Common/GlutenSignalHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ static void signalHandler(int sig, siginfo_t * info, void * context) noexcept

/// Avoid link time dependency on DB/Interpreters - will use this function only when linked.
__attribute__((__weak__)) void
collectGlutenCrashLog(Int32 signal, UInt64 thread_id, const String & query_id, const StackTrace & stack_trace);
collectGlutenCrashLog(Int32 signal, UInt64 thread_id, const String & query_id, const StackTrace & stack_trace) {

}

class SignalListener : public Poco::Runnable
{
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Parser/CHColumnToSparkRow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -964,10 +964,10 @@ jobject create(JNIEnv * env, const SparkRowInfo & spark_row_info)
{
auto * offsets_arr = env->NewLongArray(spark_row_info.getNumRows());
const auto * offsets_src = spark_row_info.getOffsets().data();
env->SetLongArrayRegion(offsets_arr, 0, spark_row_info.getNumRows(), offsets_src);
env->SetLongArrayRegion(offsets_arr, 0, spark_row_info.getNumRows(), static_cast<const jlong *>(offsets_src));
auto * lengths_arr = env->NewLongArray(spark_row_info.getNumRows());
const auto * lengths_src = spark_row_info.getLengths().data();
env->SetLongArrayRegion(lengths_arr, 0, spark_row_info.getNumRows(), lengths_src);
env->SetLongArrayRegion(lengths_arr, 0, spark_row_info.getNumRows(), static_cast<const jlong *>(lengths_src));
int64_t address = reinterpret_cast<int64_t>(spark_row_info.getBufferAddress());
int64_t column_number = spark_row_info.getNumCols();
int64_t total_size = spark_row_info.getTotalBytes();
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ void restoreMetaData<LOCAL>(
return;

// Increase the speed of metadata recovery
auto max_concurrency = std::max(10UL, QueryContext::globalContext()->getSettingsRef()[Setting::max_threads].value);
auto max_threads = std::min(max_concurrency, not_exists_part.size());
auto max_concurrency = std::max(static_cast<UInt64>(10), QueryContext::globalContext()->getSettingsRef()[Setting::max_threads].value);
auto max_threads = std::min(max_concurrency, static_cast<UInt64>(not_exists_part.size()));
FreeThreadPool thread_pool(
CurrentMetrics::LocalThread,
CurrentMetrics::LocalThreadActive,
Expand Down
4 changes: 4 additions & 0 deletions cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <Storages/ObjectStorage/HDFS/HDFSCommon.h>
#include <Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.h>
#include <Storages/Output/WriteBufferBuilder.h>
#if USE_HDFS
#include <hdfs/hdfs.h>
#endif
#include <Poco/URI.h>
#include <Common/CHUtil.h>

Expand Down Expand Up @@ -101,7 +103,9 @@ void registerWriteBufferBuilders()
auto & factory = WriteBufferBuilderFactory::instance();
//TODO: support azure and S3
factory.registerBuilder("file", [](DB::ContextPtr context_) { return std::make_shared<LocalFileWriteBufferBuilder>(context_); });
#if USE_HDFS
factory.registerBuilder("hdfs", [](DB::ContextPtr context_) { return std::make_shared<HDFSFileWriteBufferBuilder>(context_); });
#endif
}

WriteBufferBuilderFactory & WriteBufferBuilderFactory::instance()
Expand Down
14 changes: 9 additions & 5 deletions cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ add_library(substrait_source ${substrait_source_sources})
target_compile_options(
substrait_source PRIVATE -Wno-suggest-destructor-override
-Wno-inconsistent-missing-destructor-override)

target_link_libraries(
substrait_source PUBLIC boost::headers_only ch_contrib::protobuf
clickhouse_common_io ch_contrib::hdfs substrait)

if(ENABLE_HDFS)
target_link_libraries(
substrait_source PUBLIC boost::headers_only ch_contrib::protobuf
clickhouse_common_io ch_contrib::hdfs substrait)
else()
target_link_libraries(
substrait_source PUBLIC boost::headers_only ch_contrib::protobuf
clickhouse_common_io substrait)
endif()
target_include_directories(
substrait_source SYSTEM BEFORE
PUBLIC ${ARROW_INCLUDE_DIR} ${CMAKE_BINARY_DIR}/contrib/arrow-cmake/cpp/src
Expand Down
19 changes: 15 additions & 4 deletions cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <boost/compute/detail/lru_cache.hpp>
#include <hdfs/hdfs.h>
#include <sys/stat.h>
#include <Poco/Logger.h>
#include <Poco/URI.h>
Expand All @@ -64,6 +63,9 @@
#include <aws/s3/model/ListObjectsV2Request.h>
#endif

#if USE_HDFS
#include <hdfs/hdfs.h>
#endif

namespace DB
{
Expand Down Expand Up @@ -205,12 +207,15 @@ adjustReadRangeIfNeeded(std::unique_ptr<SeekableReadBuffer> read_buffer, const s
file_info.start() + file_info.length(),
start_end.first,
start_end.second);

#if USE_HDFS
/// If read buffer doesn't support right bounded reads, wrap it with BoundedReadBuffer to enable right bounded reads.
if (dynamic_cast<DB::ReadBufferFromHDFS *>(read_buffer.get()) || dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(read_buffer.get())
|| dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
read_buffer = std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));

#else
if (dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
read_buffer = std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
#endif
read_buffer->seek(start_end.first, SEEK_SET);
read_buffer->setReadUntilPosition(start_end.second);
return std::move(read_buffer);
Expand Down Expand Up @@ -744,12 +749,18 @@ ReadBufferBuilder::wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const subst
new_end);

std::unique_ptr<SeekableReadBuffer> bounded_in;
#if USE_HDFS
if (dynamic_cast<DB::ReadBufferFromHDFS *>(seekable_in.get()) || dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(seekable_in.get())
|| dynamic_cast<DB::ReadBufferFromFile *>(seekable_in.get()))
bounded_in = std::make_unique<BoundedReadBuffer>(std::move(seekable_in));
else
bounded_in = std::move(seekable_in);

#else
if (dynamic_cast<DB::ReadBufferFromFile *>(seekable_in.get()))
bounded_in = std::make_unique<BoundedReadBuffer>(std::move(seekable_in));
else
bounded_in = std::move(seekable_in);
#endif
bounded_in->seek(new_start, SEEK_SET);
bounded_in->setReadUntilPosition(new_end);
bool first_block_need_special_process = (new_start > 0);
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ JNIEXPORT jobject Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
local_engine::BlockStripes bs = local_engine::BlockStripeSplitter::split(*block, partition_col_indice_vec, hasBucket, reserve_);

auto * addresses = env->NewLongArray(bs.block_addresses.size());
env->SetLongArrayRegion(addresses, 0, bs.block_addresses.size(), bs.block_addresses.data());
env->SetLongArrayRegion(addresses, 0, bs.block_addresses.size(), static_cast<const jlong *>(bs.block_addresses.data()));
auto * indices = env->NewIntArray(bs.heading_row_indice.size());
env->SetIntArrayRegion(indices, 0, bs.heading_row_indice.size(), bs.heading_row_indice.data());

Expand Down

0 comments on commit e97a533

Please sign in to comment.