Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41560: [C++] ChunkResolver: Implement ResolveMany and add unit tests #41561

Merged
merged 11 commits into from
May 15, 2024
79 changes: 75 additions & 4 deletions cpp/src/arrow/chunk_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

#include <algorithm>
#include <cstdint>
#include <limits>
#include <memory>
#include <vector>

#include "arrow/array.h"
#include "arrow/record_batch.h"

namespace arrow {
namespace internal {
namespace arrow::internal {

namespace {
template <typename T>
Expand Down Expand Up @@ -54,6 +54,50 @@ inline std::vector<int64_t> MakeChunksOffsets(const std::vector<T>& chunks) {
offsets[chunks.size()] = offset;
return offsets;
}

/// \pre all the pre-conditions of ChunkResolver::ResolveMany()
/// \pre num_offsets - 1 <= std::numeric_limits<IndexType>::max()
template <typename IndexType>
void ResolveManyInline(size_t num_offsets, const int64_t* offsets, int64_t n_indices,
const IndexType* logical_index_vec, IndexType* out_chunk_index_vec,
IndexType chunk_hint, IndexType* out_index_in_chunk_vec) {
const auto num_chunks = static_cast<IndexType>(num_offsets - 1);
// chunk_hint in [0, num_offsets) per the precondition.
for (int64_t i = 0; i < n_indices; i++) {
const auto index = static_cast<uint64_t>(logical_index_vec[i]);
if (index >= static_cast<uint64_t>(offsets[chunk_hint]) &&
(chunk_hint == num_chunks ||
felipecrv marked this conversation as resolved.
Show resolved Hide resolved
index < static_cast<uint64_t>(offsets[chunk_hint + 1]))) {
out_chunk_index_vec[i] = chunk_hint; // hint is correct!
continue;
}
// lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
auto chunk_index =
ChunkResolver::Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets);
chunk_hint = static_cast<IndexType>(chunk_index);
out_chunk_index_vec[i] = chunk_hint;
}
if (out_index_in_chunk_vec != NULLPTR) {
for (int64_t i = 0; i < n_indices; i++) {
auto logical_index = logical_index_vec[i];
auto chunk_index = out_chunk_index_vec[i];
// chunk_index is in [0, chunks.size()] no matter what the
// value of logical_index is, so it's always safe to dereference
// offset_ as it contains chunks.size()+1 values.
out_index_in_chunk_vec[i] =
logical_index - static_cast<IndexType>(offsets[chunk_index]);
#if defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)
// Make it more likely that Valgrind/ASAN can catch an invalid memory
// access by poisoning out_index_in_chunk_vec[i] when the logical
// index is out-of-bounds.
if (chunk_index == num_chunks) {
out_index_in_chunk_vec[i] = std::numeric_limits<IndexType>::max();
}
#endif
}
}
}

} // namespace

ChunkResolver::ChunkResolver(const ArrayVector& chunks) noexcept
Expand Down Expand Up @@ -84,5 +128,32 @@ ChunkResolver& ChunkResolver::operator=(const ChunkResolver& other) noexcept {
return *this;
}

} // namespace internal
} // namespace arrow
void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint8_t* logical_index_vec,
uint8_t* out_chunk_index_vec, uint8_t chunk_hint,
uint8_t* out_index_in_chunk_vec) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint32_t* logical_index_vec,
uint32_t* out_chunk_index_vec, uint32_t chunk_hint,
uint32_t* out_index_in_chunk_vec) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint16_t* logical_index_vec,
uint16_t* out_chunk_index_vec, uint16_t chunk_hint,
uint16_t* out_index_in_chunk_vec) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint64_t* logical_index_vec,
uint64_t* out_chunk_index_vec, uint64_t chunk_hint,
uint64_t* out_index_in_chunk_vec) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec);
}

} // namespace arrow::internal
114 changes: 107 additions & 7 deletions cpp/src/arrow/chunk_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
#include <atomic>
#include <cassert>
#include <cstdint>
#include <limits>
#include <type_traits>
#include <vector>

#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"

namespace arrow::internal {
felipecrv marked this conversation as resolved.
Show resolved Hide resolved

struct ChunkResolver;

struct ChunkLocation {
/// \brief Index of the chunk in the array of chunks
///
Expand All @@ -36,8 +40,17 @@ struct ChunkLocation {

/// \brief Index of the value in the chunk
///
/// The value is undefined if chunk_index >= chunks.size()
/// The value is UNDEFINED if chunk_index >= chunks.size()
int64_t index_in_chunk = 0;

ChunkLocation() = default;

ChunkLocation(int64_t chunk_index, int64_t index_in_chunk)
: chunk_index(chunk_index), index_in_chunk(index_in_chunk) {}

bool operator==(ChunkLocation other) const {
return chunk_index == other.chunk_index && index_in_chunk == other.index_in_chunk;
}
};

/// \brief An utility that incrementally resolves logical indices into
Expand All @@ -60,12 +73,35 @@ struct ARROW_EXPORT ChunkResolver {
explicit ChunkResolver(const std::vector<const Array*>& chunks) noexcept;
explicit ChunkResolver(const RecordBatchVector& batches) noexcept;

/// \brief Construct a ChunkResolver from a vector of chunks.size() + 1 offsets.
///
/// The first offset must be 0 and the last offset must be the logical length of the
/// chunked array. Each offset before the last represents the starting logical index of
/// the corresponding chunk.
explicit ChunkResolver(std::vector<int64_t> offsets) noexcept
: offsets_(std::move(offsets)), cached_chunk_(0) {
#ifndef NDEBUG
assert(offsets_.size() >= 1);
assert(offsets_[0] == 0);
for (size_t i = 1; i < offsets_.size(); i++) {
assert(offsets_[i] >= offsets_[i - 1]);
}
#endif
}

ChunkResolver(ChunkResolver&& other) noexcept;
ChunkResolver& operator=(ChunkResolver&& other) noexcept;

ChunkResolver(const ChunkResolver& other) noexcept;
ChunkResolver& operator=(const ChunkResolver& other) noexcept;

int64_t logical_array_length() const { return offsets_.back(); }
int64_t num_chunks() const { return static_cast<int64_t>(offsets_.size()) - 1; }

int64_t chunk_length(int64_t chunk_index) const {
return offsets_[chunk_index + 1] - offsets_[chunk_index];
}

/// \brief Resolve a logical index to a ChunkLocation.
///
/// The returned ChunkLocation contains the chunk index and the within-chunk index
Expand All @@ -81,7 +117,7 @@ struct ARROW_EXPORT ChunkResolver {
const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed);
const auto chunk_index =
ResolveChunkIndex</*StoreCachedChunk=*/true>(index, cached_chunk);
return {chunk_index, index - offsets_[chunk_index]};
return ChunkLocation{chunk_index, index - offsets_[chunk_index]};
}

/// \brief Resolve a logical index to a ChunkLocation.
Expand All @@ -97,12 +133,67 @@ struct ARROW_EXPORT ChunkResolver {
/// \return ChunkLocation with a valid chunk_index if index is within
/// bounds, or with chunk_index == chunks.size() if logical index is
/// `>= chunked_array.length()`.
inline ChunkLocation ResolveWithChunkIndexHint(int64_t index,
ChunkLocation hint) const {
inline ChunkLocation ResolveWithHint(int64_t index, ChunkLocation hint) const {
assert(hint.chunk_index < static_cast<int64_t>(offsets_.size()));
const auto chunk_index =
ResolveChunkIndex</*StoreCachedChunk=*/false>(index, hint.chunk_index);
return {chunk_index, index - offsets_[chunk_index]};
return ChunkLocation{chunk_index, index - offsets_[chunk_index]};
}

/// \brief Resolve `n` logical indices to chunk indices.
///
/// \pre 0 <= logical_index_vec[i] < n (for well-defined and valid chunk index results)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean logical_array_length() rather than n?

/// \pre out_chunk_index_vec has space for `n_indices`
/// \post chunk_hint in [0, chunks.size()]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it rather a pre-condition?

/// \post out_chunk_index_vec[i] in [0, chunks.size()] for i in [0, n)
/// \post if logical_index_vec[i] >= chunked_array.length(), then
/// out_chunk_index_vec[i] == chunks.size()
/// and out_index_in_chunk_vec[i] is UNDEFINED (can be out-of-bounds)
/// \post if logical_index_vec[i] < 0, then both out_chunk_index_vec[i] and
/// out_index_in_chunk_vec[i] are UNDEFINED
///
/// \param n_indices The number of logical indices to resolve
/// \param logical_index_vec The logical indices to resolve
/// \param out_chunk_index_vec The output array where the chunk indices will be written
/// \param chunk_hint 0 or the last chunk_index produced by ResolveMany
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning the caller is supposed to pass the value of out_chunk_index_vec[n_indices - 1] from the previous call to ResolveMany?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's the plan. Or 0 if n_indices == 0. That's why I didn't want to write the formula.

/// \param out_index_in_chunk_vec If not NULLPTR, the output array where the
/// within-chunk indices will be written
/// \return false iff chunks.size() > std::numeric_limits<IndexType>::max()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming ResolveMany will be invoked in batches? This condition doesn't need to be checked in each ResolveMany call, only once for each call to the larger operation (such as Take). That's not necessarily a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming ResolveMany will be invoked in batches?

It could be. I'm not doing it like that for Take at the moment. It's a very predictable branch though and I'm allowing it to be inlined at the caller.

template <typename IndexType>
[[nodiscard]] bool ResolveMany(int64_t n_indices, const IndexType* logical_index_vec,
IndexType* out_chunk_index_vec, IndexType chunk_hint = 0,
IndexType* out_index_in_chunk_vec = NULLPTR) const {
if constexpr (sizeof(IndexType) < sizeof(uint64_t)) {
// The max value returned by Bisect is `offsets.size() - 1` (= chunks.size()).
constexpr uint64_t kMaxIndexTypeValue = std::numeric_limits<IndexType>::max();
// A ChunkedArray with enough empty chunks can make the index of a chunk
// exceed the logical index and thus the maximum value of IndexType.
const bool chunk_index_fits_on_type =
static_cast<uint64_t>(offsets_.size() - 1) <= kMaxIndexTypeValue;
if (ARROW_PREDICT_FALSE(!chunk_index_fits_on_type)) {
return false;
}
felipecrv marked this conversation as resolved.
Show resolved Hide resolved
// Since an index-in-chunk cannot possibly exceed the logical index being
// queried, we don't have to worry about these values not fitting on IndexType.
}
if constexpr (std::is_signed_v<IndexType>) {
// We interpret signed integers as unsigned and avoid having to generate double
// the amount of binary code to handle each integer width.
//
// Negative logical indices can become large values when cast to unsigned, but
// they are gracefully handled by ResolveManyImpl. Although both the chunk index
// and the index in chunk values will be undefined in these cases.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't seem to be the case: if a logical index is negative, its unsigned counterpart will be out of bounds and the corresponding out_chunk_index_vec value will therefore be equal to chunks_.size().

Copy link
Contributor Author

@felipecrv felipecrv May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. The overflow check guarantees it's impossible for negative logical indices to become valid indices.

Not really. INT8_MIN becomes 128 when cast to uint8_t, -1 becomes 255, so depending on the chunks, they won't be an out-of-bounds logical indices.

I'm tweaking the comment here and improving the tests.

using U = std::make_unsigned_t<IndexType>;
ResolveManyImpl(n_indices, reinterpret_cast<const U*>(logical_index_vec),
reinterpret_cast<U*>(out_chunk_index_vec),
static_cast<U>(chunk_hint),
reinterpret_cast<U*>(out_index_in_chunk_vec));
} else {
static_assert(std::is_unsigned_v<IndexType>);
ResolveManyImpl(n_indices, logical_index_vec, out_chunk_index_vec, chunk_hint,
out_index_in_chunk_vec);
}
return true;
}

private:
Expand Down Expand Up @@ -130,13 +221,22 @@ struct ARROW_EXPORT ChunkResolver {
return chunk_index;
}

/// \pre all the pre-conditions of ChunkResolver::ResolveMany()
/// \pre num_offsets - 1 <= std::numeric_limits<IndexType>::max()
void ResolveManyImpl(int64_t, const uint8_t*, uint8_t*, uint8_t, uint8_t*) const;
void ResolveManyImpl(int64_t, const uint16_t*, uint16_t*, uint16_t, uint16_t*) const;
void ResolveManyImpl(int64_t, const uint32_t*, uint32_t*, uint32_t, uint32_t*) const;
void ResolveManyImpl(int64_t, const uint64_t*, uint64_t*, uint64_t, uint64_t*) const;

public:
/// \brief Find the index of the chunk that contains the logical index.
///
/// Any non-negative index is accepted. When `hi=num_offsets`, the largest
/// possible return value is `num_offsets-1` which is equal to
/// `chunks.size()`. The is returned when the logical index is out-of-bounds.
/// `chunks.size()`. Which is returned when the logical index is greater or
/// equal the logical length of the chunked array.
///
/// \pre index >= 0
/// \pre index >= 0 (otherwise, when index is negative, lo is returned)
/// \pre lo < hi
/// \pre lo >= 0 && hi <= offsets_.size()
static inline int64_t Bisect(int64_t index, const int64_t* offsets, int64_t lo,
Expand Down
Loading
Loading