Skip to content

Commit

Permalink
apacheGH-37173: [C++][Go][Format] C-export/import Run-End Encoded Arr…
Browse files Browse the repository at this point in the history
…ays (apache#37174)

### Rationale for this change

All array types should be C-exportable.

It uses `"+r"` as the format string. `+` because it's a nested format and `r` was not taken and seems like the obvious choice.

### What changes are included in this PR?

- [x] Ability to C-import/expor REE arrays.
- [x] Reference update

### Are these changes tested?

Yes.
* Closes: apache#37173

Lead-authored-by: Felipe Oliveira Carvalho <[email protected]>
Co-authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
felipecrv and zeroshade authored Aug 24, 2023
1 parent b9453a2 commit 9958896
Show file tree
Hide file tree
Showing 10 changed files with 339 additions and 17 deletions.
31 changes: 31 additions & 0 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ struct SchemaExporter {
return Status::OK();
}

Status Visit(const RunEndEncodedType& type) { return SetFormat("+r"); }

ExportedSchemaPrivateData export_;
int64_t flags_ = 0;
std::vector<std::pair<std::string, std::string>> additional_metadata_;
Expand Down Expand Up @@ -1106,6 +1108,8 @@ struct SchemaImporter {
return ProcessMap();
case 'u':
return ProcessUnion();
case 'r':
return ProcessREE();
}
return f_parser_.Invalid();
}
Expand Down Expand Up @@ -1280,6 +1284,22 @@ struct SchemaImporter {
return Status::OK();
}

Status ProcessREE() {
RETURN_NOT_OK(f_parser_.CheckAtEnd());
RETURN_NOT_OK(CheckNumChildren(2));
ARROW_ASSIGN_OR_RAISE(auto run_ends_field, MakeChildField(0));
ARROW_ASSIGN_OR_RAISE(auto values_field, MakeChildField(1));
if (!is_run_end_type(run_ends_field->type()->id())) {
return Status::Invalid("Expected a valid run-end integer type, but struct has ",
run_ends_field->type()->ToString());
}
if (values_field->type()->id() == Type::RUN_END_ENCODED) {
return Status::Invalid("ArrowArray struct contains a nested run-end encoded array");
}
type_ = run_end_encoded(run_ends_field->type(), values_field->type());
return Status::OK();
}

Result<std::shared_ptr<Field>> MakeChildField(int64_t child_id) {
const auto& child = child_importers_[child_id];
if (child.c_struct_->name == nullptr) {
Expand Down Expand Up @@ -1601,6 +1621,17 @@ struct ArrayImporter {
return Status::OK();
}

Status Visit(const RunEndEncodedType& type) {
RETURN_NOT_OK(CheckNumChildren(2));
RETURN_NOT_OK(CheckNumBuffers(0));
RETURN_NOT_OK(AllocateArrayData());
// Always have a null bitmap buffer as much of the code in arrow assumes
// the buffers vector to have at least one entry on every array format.
data_->buffers.emplace_back(nullptr);
data_->null_count = 0;
return Status::OK();
}

Status ImportFixedSizePrimitive(const FixedWidthType& type) {
RETURN_NOT_OK(CheckNoChildren());
RETURN_NOT_OK(CheckNumBuffers(2));
Expand Down
230 changes: 225 additions & 5 deletions cpp/src/arrow/c/bridge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"

// TODO(GH-37221): Remove these ifdef checks when compute dependency is removed
#ifdef ARROW_COMPUTE
#include "arrow/compute/api_vector.h"
#endif

namespace arrow {

using internal::ArrayExportGuard;
Expand Down Expand Up @@ -443,6 +448,20 @@ TEST_F(TestSchemaExport, Union) {
{ARROW_FLAG_NULLABLE});
}

#ifdef ARROW_COMPUTE
TEST_F(TestSchemaExport, RunEndEncoded) {
TestNested(run_end_encoded(int16(), uint8()), {"+r", "s", "C"},
{"", "run_ends", "values"}, {ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE});
TestNested(run_end_encoded(int32(), float64()), {"+r", "i", "g"},
{"", "run_ends", "values"}, {ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE});
TestNested(run_end_encoded(int64(), utf8()), {"+r", "l", "u"},
{"", "run_ends", "values"}, {ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE});
TestNested(run_end_encoded(int32(), list(utf8())), {"+r", "i", "+l", "u"},
{"", "run_ends", "values", "item"},
{ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE, ARROW_FLAG_NULLABLE});
}
#endif

std::string GetIndexFormat(Type::type type_id) {
switch (type_id) {
case Type::UINT8:
Expand Down Expand Up @@ -952,6 +971,36 @@ TEST_F(TestArrayExport, Union) {
TestNested(type, data);
}

#ifdef ARROW_COMPUTE
Result<std::shared_ptr<Array>> REEFromJSON(const std::shared_ptr<DataType>& ree_type,
const std::string& json) {
auto ree_type_ptr = checked_cast<const RunEndEncodedType*>(ree_type.get());
auto array = ArrayFromJSON(ree_type_ptr->value_type(), json);
ARROW_ASSIGN_OR_RAISE(
auto datum,
RunEndEncode(array, compute::RunEndEncodeOptions{ree_type_ptr->run_end_type()}));
return datum.make_array();
}

TEST_F(TestArrayExport, RunEndEncoded) {
auto factory = []() {
return REEFromJSON(run_end_encoded(int32(), int8()),
"[1, 2, 2, 3, null, null, null, 4]");
};
TestNested(factory);
}

TEST_F(TestArrayExport, RunEndEncodedSliced) {
auto factory = []() -> Result<std::shared_ptr<Array>> {
ARROW_ASSIGN_OR_RAISE(auto ree_array,
REEFromJSON(run_end_encoded(int32(), int8()),
"[1, 2, 2, 3, null, null, null, 4]"));
return ree_array->Slice(1, 5);
};
TestNested(factory);
}
#endif

TEST_F(TestArrayExport, Dictionary) {
{
auto factory = []() {
Expand Down Expand Up @@ -1269,6 +1318,17 @@ class TestDeviceArrayExport : public ::testing::Test {
return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); };
}

#ifdef ARROW_COMPUTE
static std::function<Result<std::shared_ptr<Array>>()> JSONREEArrayFactory(
const std::shared_ptr<MemoryManager>& mm, std::shared_ptr<DataType> type,
const char* json) {
return [=]() -> Result<std::shared_ptr<Array>> {
ARROW_ASSIGN_OR_RAISE(auto result, REEFromJSON(type, json));
return ToDevice(mm, *result->data());
};
}
#endif

template <typename ArrayFactory, typename ExportCheckFunc>
void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& check_func) {
auto orig_bytes = pool_->bytes_allocated();
Expand Down Expand Up @@ -1465,6 +1525,17 @@ TEST_F(TestDeviceArrayExport, Union) {
TestNested(mm, type, data);
}

#ifdef ARROW_COMPUTE
TEST_F(TestDeviceArrayExport, RunEndEncoded) {
std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
auto mm = device->default_memory_manager();

auto type = run_end_encoded(int32(), int32());
const char* data = "[1, null, 2, 2, 4, 5]";
TestNested(JSONREEArrayFactory(mm, type, data));
}
#endif

TEST_F(TestDeviceArrayExport, Extension) {
std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
auto mm = device->default_memory_manager();
Expand Down Expand Up @@ -1564,11 +1635,10 @@ class SchemaStructBuilder {

// Create a new ArrowSchema struct with a stable C pointer
struct ArrowSchema* AddChild() {
nested_structs_.emplace_back();
struct ArrowSchema* result = &nested_structs_.back();
memset(result, 0, sizeof(*result));
result->release = NoOpSchemaRelease;
return result;
auto& result = nested_structs_.emplace_back();
memset(&result, 0, sizeof(result));
result.release = NoOpSchemaRelease;
return &result;
}

// Create a stable C pointer to the N last structs in nested_structs_
Expand Down Expand Up @@ -1620,6 +1690,17 @@ class SchemaStructBuilder {
c->children = NLastChildren(c->n_children, c);
}

void FillRunEndEncoded(struct ArrowSchema* c, const char* format,
const char* name = nullptr, int64_t flags = kDefaultFlags) {
c->flags = flags;
c->format = format;
c->name = name;
c->n_children = 2;
c->children = NLastChildren(2, c);
c->children[0]->name = "run_ends";
c->children[1]->name = "values";
}

void FillPrimitive(const char* format, const char* name = nullptr,
int64_t flags = kDefaultFlags) {
FillPrimitive(&c_struct_, format, name, flags);
Expand All @@ -1637,6 +1718,11 @@ class SchemaStructBuilder {
FillStructLike(&c_struct_, format, n_children, name, flags);
}

void FillRunEndEncoded(const char* format, const char* name = nullptr,
int64_t flags = kDefaultFlags) {
FillRunEndEncoded(&c_struct_, format, name, flags);
}

struct ArrowSchema c_struct_;
// Deque elements don't move when the deque is appended to, which allows taking
// stable C pointers to them.
Expand Down Expand Up @@ -1902,6 +1988,15 @@ TEST_F(TestSchemaImport, Map) {
CheckImport(expected);
}

#ifdef ARROW_COMPUTE
TEST_F(TestSchemaImport, RunEndEncoded) {
FillPrimitive(AddChild(), "s", "run_ends");
FillPrimitive(AddChild(), "I", "values");
FillRunEndEncoded("+r");
CheckImport(run_end_encoded(int16(), uint32()));
}
#endif

TEST_F(TestSchemaImport, Dictionary) {
FillPrimitive(AddChild(), "u");
FillPrimitive("c");
Expand Down Expand Up @@ -2021,6 +2116,33 @@ TEST_F(TestSchemaImport, UnionError) {
CheckImportError();
}

TEST_F(TestSchemaImport, RunEndEncodedError) {
// Bad run-end type
FillPrimitive(AddChild(), "c", "run_ends");
FillPrimitive(AddChild(), "u", "values");
FillRunEndEncoded("+r");
CheckImportError();

// REE of a REE also causes an error
ArrowSchema* run_ends = AddChild();
ArrowSchema* values;
FillPrimitive(run_ends, "i", "run_ends");
{
FillPrimitive(AddChild(), "i", "run_ends");
FillPrimitive(AddChild(), "u", "values");
values = AddChild();
FillRunEndEncoded(values, "+r", "values");
}
// Fill the top-level REE
ArrowSchema* children[2] = {run_ends, values};
c_struct_.flags = kDefaultFlags;
c_struct_.format = "+r";
c_struct_.name = "";
c_struct_.n_children = 2;
c_struct_.children = children;
CheckImportError();
}

TEST_F(TestSchemaImport, DictionaryError) {
// Bad index type
FillPrimitive(AddChild(), "c");
Expand Down Expand Up @@ -2178,6 +2300,10 @@ static const void* timestamp_buffers_no_nulls2[2] = {nullptr, timestamp_data_buf
static const void* timestamp_buffers_no_nulls3[2] = {nullptr, timestamp_data_buffer3};
static const void* timestamp_buffers_no_nulls4[2] = {nullptr, timestamp_data_buffer4};

static const uint16_t run_ends_data_buffer5[5] = {1, 2, 4, 7, 9};
[[maybe_unused]] static const void* run_ends_buffers5[2] = {nullptr,
run_ends_data_buffer5};

static const uint8_t string_data_buffer1[] = "foobarquuxxyzzy";

static const int32_t string_offsets_buffer1[] = {0, 3, 3, 6, 10, 15};
Expand Down Expand Up @@ -2354,6 +2480,20 @@ class TestArrayImport : public ::testing::Test {
legacy);
}

void FillRunEndEncoded(int64_t length, int64_t offset) {
FillRunEndEncoded(&c_struct_, length, offset);
}

void FillRunEndEncoded(struct ArrowArray* c, int64_t length, int64_t offset) {
c->length = length;
c->null_count = 0;
c->offset = offset;
c->n_buffers = 0;
c->buffers = nullptr;
c->n_children = 2;
c->children = NLastChildren(2, c);
}

void CheckImport(const std::shared_ptr<Array>& expected) {
ArrayReleaseCallback cb(&c_struct_);

Expand Down Expand Up @@ -2704,6 +2844,51 @@ TEST_F(TestArrayImport, Struct) {
CheckImport(expected);
}

#ifdef ARROW_COMPUTE
TEST_F(TestArrayImport, RunEndEncoded) {
FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5);
FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5);
FillRunEndEncoded(9, 0);
ASSERT_OK_AND_ASSIGN(auto expected,
REEFromJSON(run_end_encoded(int16(), float32()),
"[0.0, 1.5, -2.0, -2.0, 3.0, 3.0, 3.0, 4.0, 4.0]"));
ASSERT_OK(expected->ValidateFull());
CheckImport(expected);
}

TEST_F(TestArrayImport, RunEndEncodedWithOffset) {
auto ree_type = run_end_encoded(int16(), float32());
// Offset in children
FillPrimitive(AddChild(), 3, 0, 2, run_ends_buffers5);
FillPrimitive(AddChild(), 3, 0, 2, primitive_buffers_no_nulls5);
FillRunEndEncoded(7, 0);
ASSERT_OK_AND_ASSIGN(auto expected,
REEFromJSON(ree_type, "[-2.0, -2.0, -2.0, -2.0, 3.0, 3.0, 3.0]"));
CheckImport(expected);

// Ofsset in parent
FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5);
FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5);
FillRunEndEncoded(5, 2);
ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0, 3.0]"));
CheckImport(expected);

// Length in parent that cuts last run
FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5);
FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5);
FillRunEndEncoded(4, 2);
ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0]"));
CheckImport(expected);

// Offset in both children and parent
FillPrimitive(AddChild(), 3, 0, 2, run_ends_buffers5);
FillPrimitive(AddChild(), 3, 0, 2, primitive_buffers_no_nulls5);
FillRunEndEncoded(4, 2);
ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0]"));
CheckImport(expected);
}
#endif

TEST_F(TestArrayImport, SparseUnion) {
auto type = sparse_union({field("strs", utf8()), field("ints", int8())}, {43, 42});
auto expected =
Expand Down Expand Up @@ -3209,6 +3394,13 @@ TEST_F(TestSchemaRoundtrip, Union) {
TestWithTypeFactory([&]() { return dense_union({f1, f2}, type_codes); });
}

#ifdef ARROW_COMPUTE
TEST_F(TestSchemaRoundtrip, RunEndEncoded) {
TestWithTypeFactory([]() { return run_end_encoded(int16(), float32()); });
TestWithTypeFactory([]() { return run_end_encoded(int32(), list(float32())); });
}
#endif

TEST_F(TestSchemaRoundtrip, Dictionary) {
for (auto index_ty : all_dictionary_index_types()) {
TestWithTypeFactory([&]() { return dictionary(index_ty, utf8()); });
Expand Down Expand Up @@ -3500,6 +3692,34 @@ TEST_F(TestArrayRoundtrip, Union) {
}
}

#ifdef ARROW_COMPUTE
TEST_F(TestArrayRoundtrip, RunEndEncoded) {
{
auto factory = []() -> Result<std::shared_ptr<Array>> {
ARROW_ASSIGN_OR_RAISE(auto ree_array,
REEFromJSON(run_end_encoded(int32(), int8()),
"[1, 2, 2, 3, null, null, null, 4]"));
return ree_array->Slice(1, 5);
};
TestWithArrayFactory(factory);
}
{
auto factory = []() -> Result<std::shared_ptr<Array>> {
ARROW_ASSIGN_OR_RAISE(
auto ree_array,
RunEndEncodedArray::Make(
run_end_encoded(int64(), list(utf8())), 8,
ArrayFromJSON(int64(), "[1, 3, 4, 7, 8]"),
ArrayFromJSON(list(utf8()),
R"([["abc", "def"], ["efg"], [], null, ["efg", "hij"]])")));
RETURN_NOT_OK(ree_array->ValidateFull());
return ree_array;
};
TestWithArrayFactory(factory);
}
}
#endif

TEST_F(TestArrayRoundtrip, Dictionary) {
{
auto factory = []() {
Expand Down
Loading

0 comments on commit 9958896

Please sign in to comment.