From b86304a7c1f3a5bee01910dd2b196dec54303ad6 Mon Sep 17 00:00:00 2001 From: Elssky <43638383+Elssky@users.noreply.github.com> Date: Mon, 11 Nov 2024 09:45:36 +0800 Subject: [PATCH] feat(c++): label filtering API, benchmarks, and examples (#654) --- cpp/CMakeLists.txt | 1 + cpp/benchmarks/benchmark_util.h | 6 + cpp/benchmarks/label_filter_benchmark.cc | 136 ++++++++++ cpp/examples/label_filtering_example.cc | 95 +++++++ cpp/src/graphar/high-level/graph_reader.cc | 292 ++++++++++++++++++++- cpp/src/graphar/high-level/graph_reader.h | 188 +++++++++++-- cpp/src/graphar/label.cc | 113 ++++++++ cpp/src/graphar/label.h | 62 +++++ 8 files changed, 874 insertions(+), 19 deletions(-) create mode 100644 cpp/benchmarks/label_filter_benchmark.cc create mode 100644 cpp/examples/label_filtering_example.cc create mode 100644 cpp/src/graphar/label.cc create mode 100644 cpp/src/graphar/label.h diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index cd12d486c..730c46c07 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -477,6 +477,7 @@ if (BUILD_BENCHMARKS) target_link_libraries(${target} PRIVATE benchmark::benchmark_main graphar ${CMAKE_DL_LIBS}) endmacro() add_benchmark(arrow_chunk_reader_benchmark SRCS benchmarks/arrow_chunk_reader_benchmark.cc) + add_benchmark(label_filter_benchmark SRCS benchmarks/label_filter_benchmark.cc) add_benchmark(graph_info_benchmark SRCS benchmarks/graph_info_benchmark.cc) endif() diff --git a/cpp/benchmarks/benchmark_util.h b/cpp/benchmarks/benchmark_util.h index 19cd47377..6c0494ff1 100644 --- a/cpp/benchmarks/benchmark_util.h +++ b/cpp/benchmarks/benchmark_util.h @@ -41,6 +41,10 @@ class BenchmarkFixture : public ::benchmark::Fixture { path_ = std::string(c_root) + "/ldbc_sample/parquet/ldbc_sample.graph.yml"; auto maybe_graph_info = GraphInfo::Load(path_); graph_info_ = maybe_graph_info.value(); + + second_path_ = std::string(c_root) + "/ldbc/parquet/ldbc.graph.yml"; + auto second_maybe_graph_info = GraphInfo::Load(second_path_); + second_graph_info_ = second_maybe_graph_info.value(); } void TearDown(const ::benchmark::State& state) override {} @@ -48,5 +52,7 @@ class BenchmarkFixture : public ::benchmark::Fixture { protected: std::string path_; std::shared_ptr graph_info_; + std::string second_path_; + std::shared_ptr second_graph_info_; }; } // namespace graphar diff --git a/cpp/benchmarks/label_filter_benchmark.cc b/cpp/benchmarks/label_filter_benchmark.cc new file mode 100644 index 000000000..d575a075e --- /dev/null +++ b/cpp/benchmarks/label_filter_benchmark.cc @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "benchmark/benchmark.h" + +#include "./benchmark_util.h" +#include "graphar/api/high_level_reader.h" +#include "graphar/api/info.h" + +namespace graphar { + +std::shared_ptr SingleLabelFilter( + const std::shared_ptr& graph_info) { + std::string type = "organisation"; + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + std::string filter_label = "university"; + auto maybe_filter_vertices_collection = + VerticesCollection::verticesWithLabel(filter_label, graph_info, type); + auto filter_vertices = maybe_filter_vertices_collection.value(); + return filter_vertices; +} + +void SingleLabelFilterbyAcero( + const std::shared_ptr& graph_info) { + std::string type = "organisation"; + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + std::string filter_label = "university"; + auto maybe_filter_vertices_collection = + VerticesCollection::verticesWithLabelbyAcero(filter_label, graph_info, + type); + auto filter_vertices = maybe_filter_vertices_collection.value(); +} + +void MultiLabelFilter(const std::shared_ptr& graph_info) { + std::string type = "organisation"; + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + std::vector filter_label = {"university", "company"}; + auto maybe_filter_vertices_collection = + VerticesCollection::verticesWithMultipleLabels(filter_label, graph_info, + type); + auto filter_vertices = maybe_filter_vertices_collection.value(); +} + +void MultiLabelFilterbyAcero( + const std::shared_ptr& graph_info) { + std::string type = "organisation"; + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + std::vector filter_label = {"university", "company"}; + auto maybe_filter_vertices_collection = + VerticesCollection::verticesWithMultipleLabelsbyAcero(filter_label, + graph_info, type); + auto filter_vertices = maybe_filter_vertices_collection.value(); +} + +std::shared_ptr LabelFilterFromSet( + const std::shared_ptr& graph_info, + const std::shared_ptr& vertices_collection) { + std::string type = "organisation"; + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + std::vector filter_label = {"company", "public"}; + auto maybe_filter_vertices_collection = + VerticesCollection::verticesWithMultipleLabels(filter_label, + vertices_collection); + auto filter_vertices = maybe_filter_vertices_collection.value(); + return filter_vertices; +} + +BENCHMARK_DEFINE_F(BenchmarkFixture, SingleLabelFilter) +(::benchmark::State& state) { // NOLINT + for (auto _ : state) { + SingleLabelFilter(second_graph_info_); + } +} + +BENCHMARK_DEFINE_F(BenchmarkFixture, SingleLabelFilterbyAcero) +(::benchmark::State& state) { // NOLINT + for (auto _ : state) { + SingleLabelFilterbyAcero(second_graph_info_); + } +} + +BENCHMARK_DEFINE_F(BenchmarkFixture, MultiLabelFilter) +(::benchmark::State& state) { // NOLINT + for (auto _ : state) { + MultiLabelFilter(second_graph_info_); + } +} + +BENCHMARK_DEFINE_F(BenchmarkFixture, MultiLabelFilterbyAcero) +(::benchmark::State& state) { // NOLINT + for (auto _ : state) { + MultiLabelFilterbyAcero(second_graph_info_); + } +} + +BENCHMARK_DEFINE_F(BenchmarkFixture, LabelFilterFromSet) +(::benchmark::State& state) { // NOLINT + for (auto _ : state) { + state.PauseTiming(); + auto vertices_collection = SingleLabelFilter(second_graph_info_); + auto vertices_collection_2 = + LabelFilterFromSet(second_graph_info_, vertices_collection); + state.ResumeTiming(); + LabelFilterFromSet(second_graph_info_, vertices_collection_2); + } +} + +BENCHMARK_REGISTER_F(BenchmarkFixture, SingleLabelFilter)->Iterations(10); +BENCHMARK_REGISTER_F(BenchmarkFixture, SingleLabelFilterbyAcero) + ->Iterations(10); +BENCHMARK_REGISTER_F(BenchmarkFixture, MultiLabelFilter)->Iterations(10); +BENCHMARK_REGISTER_F(BenchmarkFixture, MultiLabelFilterbyAcero)->Iterations(10); +BENCHMARK_REGISTER_F(BenchmarkFixture, LabelFilterFromSet)->Iterations(10); + +} // namespace graphar diff --git a/cpp/examples/label_filtering_example.cc b/cpp/examples/label_filtering_example.cc new file mode 100644 index 000000000..e519bdda6 --- /dev/null +++ b/cpp/examples/label_filtering_example.cc @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "arrow/api.h" +#include "arrow/filesystem/api.h" + +#include "./config.h" +#include "graphar/api/arrow_reader.h" +#include "graphar/api/high_level_reader.h" + +void vertices_collection( + const std::shared_ptr& graph_info) { + std::string type = "organisation"; + auto vertex_info = graph_info->GetVertexInfo("organisation"); + auto labels = vertex_info->GetLabels(); + + std::cout << "Query vertices with a specific label" << std::endl; + std::cout << "--------------------------------------" << std::endl; + + auto maybe_filter_vertices_collection = + graphar::VerticesCollection::verticesWithLabel(std::string("company"), + graph_info, type); + + ASSERT(!maybe_filter_vertices_collection.has_error()); + auto filter_vertices = maybe_filter_vertices_collection.value(); + std::cout << "valid vertices num: " << filter_vertices->size() << std::endl; + + std::cout << std::endl; + std::cout << "Query vertices with specific label in a filtered vertices set" + << std::endl; + std::cout << "--------------------------------------" << std::endl; + + auto maybe_filter_vertices_collection_2 = + graphar::VerticesCollection::verticesWithLabel(std::string("public"), + filter_vertices); + ASSERT(!maybe_filter_vertices_collection_2.has_error()); + auto filter_vertices_2 = maybe_filter_vertices_collection_2.value(); + std::cout << "valid vertices num: " << filter_vertices_2->size() << std::endl; + + std::cout << std::endl; + std::cout << "Test vertices with multi labels" << std::endl; + std::cout << "--------------------------------------" << std::endl; + auto maybe_filter_vertices_collection_3 = + graphar::VerticesCollection::verticesWithMultipleLabels( + {"company", "public"}, graph_info, type); + ASSERT(!maybe_filter_vertices_collection_3.has_error()); + auto filter_vertices_3 = maybe_filter_vertices_collection_3.value(); + std::cout << "valid vertices num: " << filter_vertices_3->size() << std::endl; + + for (auto it = filter_vertices_3->begin(); it != filter_vertices_3->end(); + ++it) { + // get a node's all labels + auto label_result = it.label(); + std::cout << "id: " << it.id() << " "; + if (!label_result.has_error()) { + for (auto label : label_result.value()) { + std::cout << label << " "; + } + } + std::cout << "name: "; + auto property = it.property("name").value(); + std::cout << property << " "; + std::cout << std::endl; + } +} + +int main(int argc, char* argv[]) { + // read file and construct graph info + std::string path = GetTestingResourceRoot() + "/ldbc/parquet/ldbc.graph.yml"; + auto graph_info = graphar::GraphInfo::Load(path).value(); + + // vertices collection + std::cout << "Vertices collection" << std::endl; + std::cout << "-------------------" << std::endl; + vertices_collection(graph_info); + std::cout << std::endl; +} diff --git a/cpp/src/graphar/high-level/graph_reader.cc b/cpp/src/graphar/high-level/graph_reader.cc index 9d9857d3c..2cfe5b36c 100644 --- a/cpp/src/graphar/high-level/graph_reader.cc +++ b/cpp/src/graphar/high-level/graph_reader.cc @@ -17,8 +17,14 @@ * under the License. */ -#include "graphar/high-level/graph_reader.h" +#include +#include + +#include "arrow/array.h" +#include "graphar/api/arrow_reader.h" #include "graphar/convert_to_arrow_type.h" +#include "graphar/high-level/graph_reader.h" +#include "graphar/label.h" #include "graphar/types.h" namespace graphar { @@ -94,6 +100,290 @@ Vertex::Vertex(IdType id, } } +Result VertexIter::hasLabel(const std::string& label) noexcept { + std::shared_ptr column(nullptr); + label_reader_.seek(cur_offset_); + GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk()); + column = util::GetArrowColumnByName(chunk_table, label); + if (column != nullptr) { + auto array = util::GetArrowArrayByChunkIndex(column, 0); + auto bool_array = std::dynamic_pointer_cast(array); + return bool_array->Value(0); + } + return Status::KeyError("label with name ", label, + " does not exist in the vertex."); +} + +Result> VertexIter::label() noexcept { + std::shared_ptr column(nullptr); + std::vector vertex_label; + if (is_filtered_) + label_reader_.seek(filtered_ids_[cur_offset_]); + else + label_reader_.seek(cur_offset_); + GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk()); + for (auto label : labels_) { + column = util::GetArrowColumnByName(chunk_table, label); + if (column != nullptr) { + auto array = util::GetArrowArrayByChunkIndex(column, 0); + auto bool_array = std::dynamic_pointer_cast(array); + if (bool_array->Value(0)) { + vertex_label.push_back(label); + } + } + } + return vertex_label; +} + +static inline bool IsValid(bool* state, int column_number) { + for (int i = 0; i < column_number; ++i) { + // AND case + if (!state[i]) + return false; + // OR case + // if (state[i]) return true; + } + // AND case + return true; + // OR case + // return false; +} + +Result> VerticesCollection::filter( + std::vector filter_labels, + std::vector* new_valid_chunk) { + std::vector indices; + const int TOT_ROWS_NUM = vertex_num_; + const int CHUNK_SIZE = vertex_info_->GetChunkSize(); + const int TOT_LABEL_NUM = labels_.size(); + const int TESTED_LABEL_NUM = filter_labels.size(); + std::vector tested_label_ids; + + for (const auto& filter_label : filter_labels) { + auto it = std::find(labels_.begin(), labels_.end(), filter_label); + if (it != labels_.end()) { + tested_label_ids.push_back(std::distance(labels_.begin(), it)); + } + } + if (tested_label_ids.empty()) + return Status::KeyError( + "query label" + " does not exist in the vertex."); + + uint64_t* bitmap = new uint64_t[TOT_ROWS_NUM / 64 + 1]; + memset(bitmap, 0, sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1)); + int total_count = 0; + int row_num; + + if (is_filtered_) { + for (int chunk_idx : valid_chunk_) { + row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE); + std::string new_filename = + prefix_ + vertex_info_->GetPrefix() + "labels/chunk"; + int count = read_parquet_file_and_get_valid_indices( + new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM, + tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap, + QUERY_TYPE::INDEX); + if (count != 0 && new_valid_chunk != nullptr) + new_valid_chunk->emplace_back(static_cast(chunk_idx)); + } + } else { + for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; + ++chunk_idx) { + row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE); + std::string new_filename = + prefix_ + vertex_info_->GetPrefix() + "labels/chunk"; + int count = read_parquet_file_and_get_valid_indices( + new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM, + tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap, + QUERY_TYPE::INDEX); + if (count != 0) + valid_chunk_.emplace_back(static_cast(chunk_idx)); + } + } + // std::cout << "Total valid count: " << total_count << std::endl; + std::vector indices64; + + for (int value : indices) { + indices64.push_back(static_cast(value)); + } + + delete[] bitmap; + + return indices64; +} + +Result> VerticesCollection::filter_by_acero( + std::vector filter_labels) const { + std::vector indices; + const int TOT_ROWS_NUM = vertex_num_; + const int CHUNK_SIZE = vertex_info_->GetChunkSize(); + + std::vector tested_label_ids; + for (const auto& filter_label : filter_labels) { + auto it = std::find(labels_.begin(), labels_.end(), filter_label); + if (it != labels_.end()) { + tested_label_ids.push_back(std::distance(labels_.begin(), it)); + } + } + int total_count = 0; + int row_num; + std::vector> filters; + std::shared_ptr combined_filter = nullptr; + + for (const auto& label : filter_labels) { + filters.emplace_back( + graphar::_Equal(graphar::_Property(label), graphar::_Literal(true))); + } + + for (const auto& filter : filters) { + if (!combined_filter) { + combined_filter = graphar::_And(filter, filter); + } else { + combined_filter = graphar::_And(combined_filter, filter); + } + } + + auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make( + vertex_info_, labels_, prefix_, {}); + auto filter_reader = maybe_filter_reader.value(); + filter_reader->Filter(combined_filter); + for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) { + auto filter_result = filter_reader->GetLabelChunk(); + auto filter_table = filter_result.value(); + total_count += filter_table->num_rows(); + filter_reader->next_chunk(); + } + // std::cout << "Total valid count: " << total_count << std::endl; + std::vector indices64; + + for (int value : indices) { + indices64.push_back(static_cast(value)); + } + + return indices64; +} + +Result> +VerticesCollection::verticesWithLabel( + const std::string& filter_label, + const std::shared_ptr& graph_info, const std::string& type) { + auto prefix = graph_info->GetPrefix(); + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + auto vertices_collection = + std::make_shared(vertex_info, prefix); + vertices_collection->filtered_ids_ = + vertices_collection->filter({filter_label}).value(); + vertices_collection->is_filtered_ = true; + return vertices_collection; +} + +Result> +VerticesCollection::verticesWithLabelbyAcero( + const std::string& filter_label, + const std::shared_ptr& graph_info, const std::string& type) { + auto prefix = graph_info->GetPrefix(); + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + auto vertices_collection = + std::make_shared(vertex_info, prefix); + vertices_collection->filtered_ids_ = + vertices_collection->filter_by_acero({filter_label}).value(); + vertices_collection->is_filtered_ = true; + return vertices_collection; +} + +Result> +VerticesCollection::verticesWithLabel( + const std::string& filter_label, + const std::shared_ptr& vertices_collection) { + auto new_vertices_collection = std::make_shared( + vertices_collection->vertex_info_, vertices_collection->prefix_); + auto filtered_ids = + new_vertices_collection + ->filter({filter_label}, &new_vertices_collection->valid_chunk_) + .value(); + if (vertices_collection->is_filtered_) { + std::unordered_set origin_set( + vertices_collection->filtered_ids_.begin(), + vertices_collection->filtered_ids_.end()); + std::unordered_set intersection; + for (int num : filtered_ids) { + if (origin_set.count(num)) { + intersection.insert(num); + } + } + filtered_ids = + std::vector(intersection.begin(), intersection.end()); + + new_vertices_collection->is_filtered_ = true; + } + new_vertices_collection->filtered_ids_ = filtered_ids; + + return new_vertices_collection; +} + +Result> +VerticesCollection::verticesWithMultipleLabels( + const std::vector& filter_labels, + const std::shared_ptr& graph_info, const std::string& type) { + auto prefix = graph_info->GetPrefix(); + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + auto vertices_collection = + std::make_shared(vertex_info, prefix); + vertices_collection->filtered_ids_ = + vertices_collection->filter(filter_labels).value(); + vertices_collection->is_filtered_ = true; + return vertices_collection; +} + +Result> +VerticesCollection::verticesWithMultipleLabelsbyAcero( + const std::vector& filter_labels, + const std::shared_ptr& graph_info, const std::string& type) { + auto prefix = graph_info->GetPrefix(); + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + auto vertices_collection = + std::make_shared(vertex_info, prefix); + vertices_collection->filtered_ids_ = + vertices_collection->filter_by_acero(filter_labels).value(); + vertices_collection->is_filtered_ = true; + return vertices_collection; +} + +Result> +VerticesCollection::verticesWithMultipleLabels( + const std::vector& filter_labels, + const std::shared_ptr& vertices_collection) { + auto new_vertices_collection = std::make_shared( + vertices_collection->vertex_info_, vertices_collection->prefix_); + auto filtered_ids = + vertices_collection + ->filter(filter_labels, &new_vertices_collection->valid_chunk_) + .value(); + if (vertices_collection->is_filtered_) { + std::unordered_set origin_set( + vertices_collection->filtered_ids_.begin(), + vertices_collection->filtered_ids_.end()); + std::unordered_set intersection; + for (int num : filtered_ids) { + if (origin_set.count(num)) { + intersection.insert(num); + } + } + filtered_ids = + std::vector(intersection.begin(), intersection.end()); + + new_vertices_collection->is_filtered_ = true; + } + new_vertices_collection->filtered_ids_ = filtered_ids; + + return new_vertices_collection; +} + template Result Vertex::property(const std::string& property) const { if constexpr (std::is_final::value) { diff --git a/cpp/src/graphar/high-level/graph_reader.h b/cpp/src/graphar/high-level/graph_reader.h index 20e7d8f8d..19c8f716a 100644 --- a/cpp/src/graphar/high-level/graph_reader.h +++ b/cpp/src/graphar/high-level/graph_reader.h @@ -74,6 +74,13 @@ class Vertex { template Result property(const std::string& property) const; + /** + * @brief Get the label of the vertex. + * @return Result: The label of the vertex. + */ + template + Result label() const; + /** * @brief Return true if value at the property is valid (not null). * @@ -172,40 +179,80 @@ class VertexIter { * @param offset The current offset of the readers. */ explicit VertexIter(const std::shared_ptr& vertex_info, - const std::string& prefix, IdType offset) noexcept { + const std::string& prefix, IdType offset, + const std::vector& labels, + const bool& is_filtered = false, + const std::vector& filtered_ids = {}) noexcept { + if (!labels.empty()) { + labels_ = labels; + label_reader_ = + VertexPropertyArrowChunkReader(vertex_info, labels, prefix); + } for (const auto& pg : vertex_info->GetPropertyGroups()) { readers_.emplace_back(vertex_info, pg, prefix); } + is_filtered_ = is_filtered; + filtered_ids_ = filtered_ids; cur_offset_ = offset; } /** Copy constructor. */ VertexIter(const VertexIter& other) - : readers_(other.readers_), cur_offset_(other.cur_offset_) {} + : readers_(other.readers_), + cur_offset_(other.cur_offset_), + labels_(other.labels_), + label_reader_(other.label_reader_), + is_filtered_(other.is_filtered_), + filtered_ids_(other.filtered_ids_) {} /** Construct and return the vertex of the current offset. */ Vertex operator*() noexcept { - for (auto& reader : readers_) { - reader.seek(cur_offset_); + if (is_filtered_) { + for (auto& reader : readers_) { + reader.seek(filtered_ids_[cur_offset_]); + } + } else { + for (auto& reader : readers_) { + reader.seek(cur_offset_); + } } + return Vertex(cur_offset_, readers_); } /** Get the vertex id of the current offset. */ - IdType id() { return cur_offset_; } + IdType id() { + if (is_filtered_) { + return filtered_ids_[cur_offset_]; + } else { + return cur_offset_; + } + } /** Get the value for a property of the current vertex. */ template Result property(const std::string& property) noexcept { std::shared_ptr column(nullptr); - for (auto& reader : readers_) { - reader.seek(cur_offset_); - GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk()); - column = util::GetArrowColumnByName(chunk_table, property); - if (column != nullptr) { - break; + if (is_filtered_) { + for (auto& reader : readers_) { + reader.seek(filtered_ids_[cur_offset_]); + GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk()); + column = util::GetArrowColumnByName(chunk_table, property); + if (column != nullptr) { + break; + } + } + } else { + for (auto& reader : readers_) { + reader.seek(cur_offset_); + GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk()); + column = util::GetArrowColumnByName(chunk_table, property); + if (column != nullptr) { + break; + } } } + if (column != nullptr) { auto array = util::GetArrowArrayByChunkIndex(column, 0); GAR_ASSIGN_OR_RAISE(auto data, util::GetArrowArrayData(array)); @@ -215,6 +262,12 @@ class VertexIter { " does not exist in the vertex."); } + /** Determine whether a vertex has the input label. */ + Result hasLabel(const std::string& label) noexcept; + + /** Get the labels of the current vertex. */ + Result> label() noexcept; + /** The prefix increment operator. */ VertexIter& operator++() noexcept { ++cur_offset_; @@ -253,7 +306,11 @@ class VertexIter { private: std::vector readers_; + VertexPropertyArrowChunkReader label_reader_; + std::vector labels_; IdType cur_offset_; + bool is_filtered_; + std::vector filtered_ids_; }; /** @@ -266,11 +323,18 @@ class VerticesCollection { * @brief Initialize the VerticesCollection. * * @param vertex_info The vertex info that describes the vertex type. + * @param labels The labels of the vertex. * @param prefix The absolute prefix. */ explicit VerticesCollection(const std::shared_ptr& vertex_info, - const std::string& prefix) - : vertex_info_(std::move(vertex_info)), prefix_(prefix) { + const std::string& prefix, + const bool is_filtered = false, + const std::vector filtered_ids = {}) + : vertex_info_(std::move(vertex_info)), + prefix_(prefix), + labels_(vertex_info->GetLabels()), + is_filtered_(is_filtered), + filtered_ids_(filtered_ids) { // get the vertex num std::string base_dir; GAR_ASSIGN_OR_RAISE_ERROR(auto fs, @@ -283,21 +347,104 @@ class VerticesCollection { } /** The iterator pointing to the first vertex. */ - VertexIter begin() noexcept { return VertexIter(vertex_info_, prefix_, 0); } + VertexIter begin() noexcept { + return VertexIter(vertex_info_, prefix_, 0, labels_, is_filtered_, + filtered_ids_); + } /** The iterator pointing to the past-the-end element. */ VertexIter end() noexcept { - return VertexIter(vertex_info_, prefix_, vertex_num_); + if (is_filtered_) + return VertexIter(vertex_info_, prefix_, filtered_ids_.size(), labels_, + is_filtered_, filtered_ids_); + return VertexIter(vertex_info_, prefix_, vertex_num_, labels_, is_filtered_, + filtered_ids_); } /** The iterator pointing to the vertex with specific id. */ - VertexIter find(IdType id) { return VertexIter(vertex_info_, prefix_, id); } + VertexIter find(IdType id) { + return VertexIter(vertex_info_, prefix_, id, labels_); + } /** Get the number of vertices in the collection. */ - size_t size() const noexcept { return vertex_num_; } + size_t size() const noexcept { + if (is_filtered_) + return filtered_ids_.size(); + else + return vertex_num_; + } + + /** The vertex id list that satisfies the label filter condition. */ + Result> filter( + std::vector filter_labels, + std::vector* new_valid_chunk = nullptr); + + Result> filter_by_acero( + std::vector filter_labels) const; + + /** + * @brief Query vertices with a specific label + * + * @param filter_label The label to query vertices by + * @param graph_info A smart pointer to GraphInfo that contains details about + * the graph + * @param type The type of vertices to query + * @return A VerticesCollection containing all vertices that have the + * specified label + */ + static Result> verticesWithLabel( + const std::string& filter_label, + const std::shared_ptr& graph_info, const std::string& type); + + static Result> verticesWithLabelbyAcero( + const std::string& filter_label, + const std::shared_ptr& graph_info, const std::string& type); + + /** + * @brief Query vertices with a specific label within a given collection + * + * @param filter_label The label to query vertices by + * @param vertices_collection The collection of vertices to search within + * @return A VerticesCollection containing all vertices from the specified + * collection that have the specified label + */ + static Result> verticesWithLabel( + const std::string& filter_label, + const std::shared_ptr& vertices_collection); + + /** + * @brief Query vertices with multiple labels + * + * @param filter_labels A vector of labels to query vertices by + * @param graph_info A smart pointer to GraphInfo that contains details about + * the graph + * @param type The type of vertices to query + * @return A VerticesCollection containing all vertices that have all of the + * specified labels + */ + static Result> verticesWithMultipleLabels( + const std::vector& filter_labels, + const std::shared_ptr& graph_info, const std::string& type); + + static Result> + verticesWithMultipleLabelsbyAcero( + const std::vector& filter_labels, + const std::shared_ptr& graph_info, const std::string& type); + + /** + * @brief Query vertices with multiple labels within a given collection + * + * @param filter_labels A vector of labels to query vertices by + * @param vertices_collection The collection of vertices to search within + * @return A VerticesCollection containing all vertices from the specified + * collection that have all of the specified labels + */ + static Result> verticesWithMultipleLabels( + const std::vector& filter_labels, + const std::shared_ptr& vertices_collection); /** - * @brief Construct a VerticesCollection from graph info and vertex type. + * @brief Construct a VerticesCollection from graph info and vertex label. * * @param graph_info The graph info. * @param type The vertex type. @@ -305,6 +452,7 @@ class VerticesCollection { static Result> Make( const std::shared_ptr& graph_info, const std::string& type) { auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); if (!vertex_info) { return Status::KeyError("The vertex ", type, " doesn't exist."); } @@ -315,6 +463,10 @@ class VerticesCollection { private: std::shared_ptr vertex_info_; std::string prefix_; + std::vector labels_; + bool is_filtered_; + std::vector filtered_ids_; + std::vector valid_chunk_; IdType vertex_num_; }; diff --git a/cpp/src/graphar/label.cc b/cpp/src/graphar/label.cc new file mode 100644 index 000000000..e8b23797f --- /dev/null +++ b/cpp/src/graphar/label.cc @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "graphar/label.h" + +#include +#include +#include +#include +#include +#include + +/// Read a parquet file by ParquetReader & get valid indices +/// The first column_num labels are concerned. +int read_parquet_file_and_get_valid_indices( + const char* parquet_filename, const int row_num, const int tot_label_num, + const int tested_label_num, std::vector tested_label_ids, + const std::function& IsValid, int chunk_idx, + int chunk_size, std::vector* indices, uint64_t* bitmap, + const QUERY_TYPE query_type) { + // Create a ParquetReader instance + std::unique_ptr parquet_reader = + parquet::ParquetFileReader::OpenFile( + parquet_filename + std::to_string(chunk_idx), false); + + // Get the File MetaData + std::shared_ptr file_metadata = + parquet_reader->metadata(); + int row_group_count = file_metadata->num_row_groups(); + int num_columns = file_metadata->num_columns(); + + // Initialize the column row counts + std::vector col_row_counts(num_columns, 0); + bool** value = new bool*[num_columns]; + for (int i = 0; i < num_columns; i++) { + value[i] = new bool[row_num]; + } + + // Iterate over all the RowGroups in the file + for (int rg = 0; rg < row_group_count; ++rg) { + // Get the RowGroup Reader + std::shared_ptr row_group_reader = + parquet_reader->RowGroup(rg); + + int64_t values_read = 0; + int64_t rows_read = 0; + std::shared_ptr column_reader; + + ARROW_UNUSED(rows_read); // prevent warning in release build + + // Read the label columns + for (int k = 0; k < tested_label_num; k++) { + int col_id = tested_label_ids[k]; + // Get the Column Reader for the Bool column + column_reader = row_group_reader->Column(col_id); + parquet::BoolReader* bool_reader = + static_cast(column_reader.get()); + // Read all the rows in the column + while (bool_reader->HasNext()) { + // Read BATCH_SIZE values at a time. The number of rows read is + // returned. values_read contains the number of non-null rows + + rows_read = bool_reader->ReadBatch(BATCH_SIZE, nullptr, nullptr, + value[k] + col_row_counts[col_id], + &values_read); + + // There are no NULL values in the rows written + col_row_counts[col_id] += rows_read; + } + } + } + const int kTotLabelNum = tot_label_num; + bool state[kTotLabelNum]; + int count = 0; + int offset = chunk_idx * chunk_size; + for (int i = 0; i < row_num; i++) { + for (int j = 0; j < tested_label_num; j++) { + state[j] = value[j][i]; + } + if (IsValid(state, tested_label_num)) { + count++; + if (query_type == QUERY_TYPE::INDEX) + + indices->push_back(i + offset); + else if (query_type == QUERY_TYPE::BITMAP) + SetBitmap(bitmap, i); + } + } + + // destroy the allocated space + for (int i = 0; i < num_columns; i++) { + delete[] value[i]; + } + delete[] value; + + return count; +} diff --git a/cpp/src/graphar/label.h b/cpp/src/graphar/label.h new file mode 100644 index 000000000..145312e4d --- /dev/null +++ b/cpp/src/graphar/label.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef CPP_SRC_GRAPHAR_LABEL_H_ +#define CPP_SRC_GRAPHAR_LABEL_H_ + +#include +#include +#include +#include +#include + +#include +#include +#include + +using parquet::ConvertedType; +using parquet::Encoding; +using parquet::Repetition; +using parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::PrimitiveNode; + +constexpr int BATCH_SIZE = 1024; // the batch size + +/// The query type +enum QUERY_TYPE { + COUNT, // return the number of valid vertices + INDEX, // return the indices of valid vertices + BITMAP, // return the bitmap of valid vertices + ADAPTIVE // adaptively return indices or bitmap +}; + +/// Set bit +static inline void SetBitmap(uint64_t* bitmap, const int index) { + bitmap[index >> 6] |= (1ULL << (index & 63)); +} + +int read_parquet_file_and_get_valid_indices( + const char* parquet_filename, const int row_num, const int tot_label_num, + const int tested_label_num, std::vector tested_label_ids, + const std::function& IsValid, int chunk_idx, + int chunk_size, std::vector* indices = nullptr, + uint64_t* bitmap = nullptr, const QUERY_TYPE query_type = COUNT); + +#endif // CPP_SRC_GRAPHAR_LABEL_H_