Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply patches to the Arrow to 16.1.0 #68

Merged
merged 11 commits into from
Oct 16, 2024
14 changes: 14 additions & 0 deletions cpp/src/arrow/adapters/orc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ class ORCFileReader::Impl {
return Init();
}

virtual liborc::Reader* GetRawORCReader() {
return reader_.get();
}


Status Init() {
int64_t nstripes = reader_->getNumberOfStripes();
stripes_.resize(static_cast<size_t>(nstripes));
Expand Down Expand Up @@ -559,6 +564,15 @@ ORCFileReader::ORCFileReader() { impl_.reset(new ORCFileReader::Impl()); }

ORCFileReader::~ORCFileReader() {}

liborc::Reader* ORCFileReader::GetRawORCReader() {
return impl_->GetRawORCReader();
}

Status ORCFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
MemoryPool* pool, std::unique_ptr<ORCFileReader>* reader) {
return Open(file, pool).Value(reader);
}

Result<std::unique_ptr<ORCFileReader>> ORCFileReader::Open(
const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool) {
#ifdef ARROW_ORC_NEED_TIME_ZONE_DATABASE_CHECK
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/adapters/orc/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
#include "arrow/adapters/orc/util.h"

namespace arrow {
namespace adapters {
Expand All @@ -53,6 +54,19 @@ class ARROW_EXPORT ORCFileReader {
public:
~ORCFileReader();

/// \brief Creates a new ORC reader.
///
/// \param[in] file the data source
/// \param[in] pool a MemoryPool to use for buffer allocations
/// \param[out] reader the returned reader object
/// \return Status
ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
static Status Open(const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool,
std::unique_ptr<ORCFileReader>* reader);

/// \brief Get ORC reader from inside.
liborc::Reader* GetRawORCReader();

/// \brief Creates a new ORC reader
///
/// \param[in] file the data source
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,23 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
return total;
}

Result<int64_t> RecordBatchCountRows(int i) override {
DCHECK_GE(i, 0);
DCHECK_LT(i, num_record_batches());
ARROW_ASSIGN_OR_RAISE(auto outer_message,
ReadMessageFromBlock(GetRecordBatchBlock(i)));
auto metadata = outer_message->metadata();
const flatbuf::Message* message = nullptr;
RETURN_NOT_OK(
internal::VerifyMessage(metadata->data(), metadata->size(), &message));
auto batch = message->header_as_RecordBatch();
if (batch == nullptr) {
return Status::IOError(
"Header-type of flatbuffer-encoded Message is not RecordBatch.");
}
return batch->length();
}

Status Open(const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
const IpcReadOptions& options) {
owned_file_ = file;
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ class ARROW_EXPORT RecordBatchFileReader
/// \brief Computes the total number of rows in the file.
virtual Result<int64_t> CountRows() = 0;

virtual Result<int64_t> RecordBatchCountRows(int i) = 0;

/// \brief Begin loading metadata for the desired batches into memory.
///
/// This method will also begin loading all dictionaries messages into memory.
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/util/bit_stream_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer,
#pragma warning(push)
#pragma warning(disable : 4800)
#endif
if (ARROW_PREDICT_FALSE(*bit_offset >= 64)) {
auto msg = std::string("invalid bit offset: ") + std::to_string(*bit_offset);
msg += ", may be malformed num_bits: " + std::to_string(num_bits);
throw std::runtime_error(msg);
}
*v = static_cast<T>(bit_util::TrailingBits(*buffered_values, *bit_offset + num_bits) >>
*bit_offset);
#ifdef _MSC_VER
Expand Down
15 changes: 8 additions & 7 deletions cpp/src/arrow/util/logging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#endif
#include <cstdlib>
#include <iostream>
#include <sstream>

#ifdef ARROW_USE_GLOG

Expand Down Expand Up @@ -65,33 +66,33 @@ class CerrLog {
public:
explicit CerrLog(ArrowLogLevel severity) : severity_(severity), has_logged_(false) {}

virtual ~CerrLog() {
virtual ~CerrLog() noexcept(false) {
if (has_logged_) {
std::cerr << std::endl;
stream << std::endl;
}
if (severity_ == ArrowLogLevel::ARROW_FATAL) {
PrintBackTrace();
std::abort();
throw std::runtime_error(stream.str());
}
}

std::ostream& Stream() {
has_logged_ = true;
return std::cerr;
return stream;
}

template <class T>
CerrLog& operator<<(const T& t) {
if (severity_ != ArrowLogLevel::ARROW_DEBUG) {
has_logged_ = true;
std::cerr << t;
stream << t;
}
return *this;
}

protected:
const ArrowLogLevel severity_;
bool has_logged_;
std::stringstream stream;

void PrintBackTrace() {
#ifdef ARROW_WITH_BACKTRACE
Expand Down Expand Up @@ -248,7 +249,7 @@ std::ostream& ArrowLog::Stream() {

bool ArrowLog::IsEnabled() const { return is_enabled_; }

ArrowLog::~ArrowLog() {
ArrowLog::~ArrowLog() noexcept(false) {
if (logging_provider_ != nullptr) {
delete reinterpret_cast<LoggingProvider*>(logging_provider_);
logging_provider_ = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/util/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ enum class ArrowLogLevel : int {
// This is also a null log which does not output anything.
class ARROW_EXPORT ArrowLogBase {
public:
virtual ~ArrowLogBase() {}
virtual ~ArrowLogBase() noexcept(false) {}

virtual bool IsEnabled() const { return false; }

Expand All @@ -176,7 +176,7 @@ class ARROW_EXPORT ArrowLogBase {
class ARROW_EXPORT ArrowLog : public ArrowLogBase {
public:
ArrowLog(const char* file_name, int line_number, ArrowLogLevel severity);
~ArrowLog() override;
~ArrowLog() noexcept(false) override;

/// Return whether or not current logging instance is enabled.
///
Expand Down
33 changes: 28 additions & 5 deletions cpp/src/arrow/util/mutex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include <mutex>

#ifndef _WIN32
#if !defined( _WIN32) && !defined(__ppc64__)
#include <pthread.h>
#include <atomic>
#endif
Expand Down Expand Up @@ -59,7 +59,7 @@ Mutex::Guard Mutex::Lock() {

Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {}

#ifndef _WIN32
#if !defined( _WIN32) && !defined(__ppc64__)
namespace {

struct AfterForkState {
Expand All @@ -71,19 +71,42 @@ struct AfterForkState {
// The leak (only in child processes) is a small price to pay for robustness.
Mutex* mutex = nullptr;

enum State {
INITIALIZED,
IN_PROCESS,
NOT_INITIALIZED,
};

std::atomic_int state = INITIALIZED;

private:
AfterForkState() {
pthread_atfork(/*prepare=*/nullptr, /*parent=*/nullptr, /*child=*/&AfterFork);
}

static void AfterFork() { instance.mutex = new Mutex; }
static void AfterFork() { instance.state.store(NOT_INITIALIZED); }

};

AfterForkState AfterForkState::instance;
} // namespace

Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; }
#endif // _WIN32
Mutex* GlobalForkSafeMutex() {
if (AfterForkState::instance.state.load() == AfterForkState::State::INITIALIZED) {
return AfterForkState::instance.mutex;
}

int expected = AfterForkState::State::NOT_INITIALIZED;
if (AfterForkState::instance.state.compare_exchange_strong(expected, AfterForkState::State::IN_PROCESS)) {
AfterForkState::instance.mutex = new Mutex;
AfterForkState::instance.state.store(AfterForkState::State::INITIALIZED);
} else {
while (AfterForkState::instance.state.load() != AfterForkState::State::INITIALIZED);
}

return AfterForkState::instance.mutex;
}
#endif // _WIN32 and __ppc64__

} // namespace util
} // namespace arrow
2 changes: 1 addition & 1 deletion cpp/src/arrow/util/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ARROW_EXPORT Mutex {
std::unique_ptr<Impl, void (*)(Impl*)> impl_;
};

#ifndef _WIN32
#if !defined(_WIN32) && !defined(__ppc64__)
/// Return a pointer to a process-wide, process-specific Mutex that can be used
/// at any point in a child process. NULL is returned when called in the parent.
///
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,23 @@ Status MapToSchemaField(const GroupNode& group, LevelInfo current_levels,
return Status::Invalid("Key-value map node must have 1 or 2 child elements. Found: ",
key_value.field_count());
}

/*
* If Parquet file was written by Flink, key type of map column is allowed to be optional, like this:
* optional group event_info (MAP) {
* repeated group key_value {
* optional binary key (UTF8);
* optional binary value (UTF8);
* }
* }
*
* Refer to: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/types/#constructured-data-types
const Node& key_node = *key_value.field(0);
if (!key_node.is_required()) {
return Status::Invalid("Map keys must be annotated as required.");
}
*/

// Arrow doesn't support 1 column maps (i.e. Sets). The options are to either
// make the values column nullable, or process the map as a list. We choose the latter
// as it is simpler.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2721,7 +2721,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
T min_delta_;
uint32_t mini_block_idx_;
std::shared_ptr<ResizableBuffer> delta_bit_widths_;
int delta_bit_width_;
int delta_bit_width_ = 0;

T last_value_;
};
Expand Down
Loading