diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index d162392b8..730c46c07 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -458,6 +458,7 @@ if (BUILD_TESTS) add_test(test_chunk_info_reader SRCS test/test_chunk_info_reader.cc) add_test(test_arrow_chunk_reader SRCS test/test_arrow_chunk_reader.cc) add_test(test_graph SRCS test/test_graph.cc) + add_test(test_multi_label SRCS test/test_multi_label.cc) # enable_testing() endif() @@ -476,6 +477,7 @@ if (BUILD_BENCHMARKS) target_link_libraries(${target} PRIVATE benchmark::benchmark_main graphar ${CMAKE_DL_LIBS}) endmacro() add_benchmark(arrow_chunk_reader_benchmark SRCS benchmarks/arrow_chunk_reader_benchmark.cc) + add_benchmark(label_filter_benchmark SRCS benchmarks/label_filter_benchmark.cc) add_benchmark(graph_info_benchmark SRCS benchmarks/graph_info_benchmark.cc) endif() diff --git a/cpp/benchmarks/benchmark_util.h b/cpp/benchmarks/benchmark_util.h index 19cd47377..4d7d9f9ba 100644 --- a/cpp/benchmarks/benchmark_util.h +++ b/cpp/benchmarks/benchmark_util.h @@ -41,6 +41,10 @@ class BenchmarkFixture : public ::benchmark::Fixture { path_ = std::string(c_root) + "/ldbc_sample/parquet/ldbc_sample.graph.yml"; auto maybe_graph_info = GraphInfo::Load(path_); graph_info_ = maybe_graph_info.value(); + + second_path_ = std::string(c_root) + "/ldbc_large/parquet/ldbc.graph.yml"; + auto second_maybe_graph_info = GraphInfo::Load(second_path_); + second_graph_info_ = second_maybe_graph_info.value(); } void TearDown(const ::benchmark::State& state) override {} @@ -48,5 +52,7 @@ class BenchmarkFixture : public ::benchmark::Fixture { protected: std::string path_; std::shared_ptr graph_info_; + std::string second_path_; + std::shared_ptr second_graph_info_; }; } // namespace graphar diff --git a/cpp/benchmarks/label_filter_benchmark.cc b/cpp/benchmarks/label_filter_benchmark.cc new file mode 100644 index 000000000..922c4ac82 --- /dev/null +++ b/cpp/benchmarks/label_filter_benchmark.cc @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "benchmark/benchmark.h" + +#include "./benchmark_util.h" +#include "graphar/api/high_level_reader.h" +#include "graphar/api/info.h" + +namespace graphar { + +std::shared_ptr SingleLabelFilter(const std::shared_ptr& graph_info) { + std::string type = "comment"; + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + std::string filter_label = "Chrome"; + auto maybe_filter_vertices_collection = + VerticesCollection::verticesWithLabel(filter_label, graph_info, type); + auto filter_vertices = maybe_filter_vertices_collection.value(); + return filter_vertices; +} + +void SingleLabelFilterbyAcero( + const std::shared_ptr& graph_info) { + std::string type = "comment"; + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + std::string filter_label = "Chrome"; + auto maybe_filter_vertices_collection = + VerticesCollection::verticesWithLabelbyAcero(filter_label, graph_info, + type); + auto filter_vertices = maybe_filter_vertices_collection.value(); +} + +void MultiLabelFilter(const std::shared_ptr& graph_info) { + std::string type = "comment"; + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + std::vector filter_label = {"Chrome", "Firefox"}; + auto maybe_filter_vertices_collection = + VerticesCollection::verticesWithMultipleLabels(filter_label, graph_info, + type); + auto filter_vertices = maybe_filter_vertices_collection.value(); +} + +void MultiLabelFilterbyAcero( + const std::shared_ptr& graph_info) { + std::string type = "comment"; + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + std::vector filter_label = {"Chrome", "Firefox"}; + auto maybe_filter_vertices_collection = + VerticesCollection::verticesWithMultipleLabelsbyAcero(filter_label, + graph_info, type); + auto filter_vertices = maybe_filter_vertices_collection.value(); +} + +void LabelFilterFromSet(const std::shared_ptr& graph_info, + const std::shared_ptr& vertices_collection) { + std::string type = "comment"; + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + std::vector filter_label = { "Firefox", "Internet Explorer"}; + auto maybe_filter_vertices_collection = + VerticesCollection::verticesWithMultipleLabels(filter_label, + vertices_collection); + auto filter_vertices = maybe_filter_vertices_collection.value(); +} + +BENCHMARK_DEFINE_F(BenchmarkFixture, SingleLabelFilter) +(::benchmark::State& state) { // NOLINT + for (auto _ : state) { + SingleLabelFilter(second_graph_info_); + } +} + +BENCHMARK_DEFINE_F(BenchmarkFixture, SingleLabelFilterbyAcero) +(::benchmark::State& state) { // NOLINT + for (auto _ : state) { + SingleLabelFilterbyAcero(second_graph_info_); + } +} + +BENCHMARK_DEFINE_F(BenchmarkFixture, MultiLabelFilter) +(::benchmark::State& state) { // NOLINT + for (auto _ : state) { + MultiLabelFilter(second_graph_info_); + } +} + +BENCHMARK_DEFINE_F(BenchmarkFixture, MultiLabelFilterbyAcero) +(::benchmark::State& state) { // NOLINT + for (auto _ : state) { + MultiLabelFilterbyAcero(second_graph_info_); + } +} + +BENCHMARK_DEFINE_F(BenchmarkFixture, LabelFilterFromSet) +(::benchmark::State& state) { // NOLINT + for (auto _ : state) { + state.PauseTiming(); + auto vertices_collection = SingleLabelFilter(second_graph_info_); + state.ResumeTiming(); + LabelFilterFromSet(second_graph_info_, vertices_collection); + } +} + +BENCHMARK_REGISTER_F(BenchmarkFixture, SingleLabelFilter)->Iterations(10); +// BENCHMARK_REGISTER_F(BenchmarkFixture, SingleLabelFilterbyAcero) +// ->Iterations(10); +// BENCHMARK_REGISTER_F(BenchmarkFixture, MultiLabelFilter)->Iterations(10); +// BENCHMARK_REGISTER_F(BenchmarkFixture, MultiLabelFilterbyAcero)->Iterations(10); +BENCHMARK_REGISTER_F(BenchmarkFixture, LabelFilterFromSet)->Iterations(10); + +} // namespace graphar diff --git a/cpp/examples/bgl_example.cc b/cpp/examples/bgl_example.cc index d8231c6f4..6cbb574d0 100644 --- a/cpp/examples/bgl_example.cc +++ b/cpp/examples/bgl_example.cc @@ -112,7 +112,7 @@ int main(int argc, char* argv[]) { int chunk_size = 100; auto version = graphar::InfoVersion::Parse("gar/v1").value(); auto new_info = graphar::CreateVertexInfo(vertex_type, chunk_size, {group}, - vertex_prefix, version); + {}, vertex_prefix, version); // dump new vertex info ASSERT(new_info->IsValidated()); ASSERT(new_info->Dump().status().ok()); diff --git a/cpp/examples/construct_info_example.cc b/cpp/examples/construct_info_example.cc index d5b390b7d..b9c0a2761 100644 --- a/cpp/examples/construct_info_example.cc +++ b/cpp/examples/construct_info_example.cc @@ -43,7 +43,7 @@ int main(int argc, char* argv[]) { graphar::CreatePropertyGroup(property_vector_2, graphar::FileType::ORC); // create vertex info - auto vertex_info = graphar::CreateVertexInfo(type, chunk_size, {group1}, + auto vertex_info = graphar::CreateVertexInfo(type, chunk_size, {group1}, {}, vertex_prefix, version); ASSERT(vertex_info != nullptr); @@ -150,7 +150,7 @@ int main(int argc, char* argv[]) { // create graph info auto graph_info = graphar::CreateGraphInfo(name, {vertex_info}, {edge_info}, - prefix, version); + {}, prefix, version); ASSERT(graph_info->GetName() == name); ASSERT(graph_info->GetPrefix() == prefix); ASSERT(graph_info->GetVertexInfos().size() == 1); diff --git a/cpp/examples/high_level_label_reader_example.cc b/cpp/examples/high_level_label_reader_example.cc new file mode 100644 index 000000000..9b9bcf4d8 --- /dev/null +++ b/cpp/examples/high_level_label_reader_example.cc @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "arrow/api.h" +#include "arrow/filesystem/api.h" + +#include "./config.h" +#include "graphar/api/high_level_reader.h" + +void vertices_collection( + const std::shared_ptr& graph_info) { + std::string type = "organisation"; + auto vertex_info = graph_info->GetVertexInfo("organisation"); + auto labels = vertex_info->GetLabels(); + + std::cout << "Query vertices with a specific label" << std::endl; + std::cout << "--------------------------------------" << std::endl; + + auto maybe_filter_vertices_collection = + graphar::VerticesCollection::verticesWithLabel(std::string("company"), + graph_info, type); + + ASSERT(!maybe_filter_vertices_collection.has_error()); + auto filter_vertices = maybe_filter_vertices_collection.value(); + std::cout << "valid vertices num: " << filter_vertices->size() << std::endl; + + // for (auto it = filter_vertices->begin(); it != filter_vertices->end(); + // ++it) { + // // get a node's all labels + // auto label_result = it.label(); + // std::cout<< "id: " << it.id()<<" "; + // if (!label_result.has_error()) { + // for (auto label : label_result.value()) { + // std::cout << label << " "; + // } + // } + // std::cout << "name: "; + // auto property = it.property("name").value(); + // std::cout << property << " "; + // std::cout<size() << std::endl; + + // for (auto it = filter_vertices_2->begin(); it != filter_vertices_2->end(); + // ++it) { + // auto label_result = it.label(); + // std::cout<< "id: " << it.id()<<" "; + // if (!label_result.has_error()) { + // for (auto label : label_result.value()) { + // std::cout << label << " "; + // } + // } + // std::cout << "name: "; + // auto property = it.property("name").value(); + // std::cout << property << " "; + // std::cout<size() << std::endl; + + // for (auto it = filter_vertices_3->begin(); it != filter_vertices_3->end(); + // ++it) { + // // get a node's all labels + // auto label_result = it.label(); + // std::cout<< "id: " << it.id()<<" "; + // if (!label_result.has_error()) { + // for (auto label : label_result.value()) { + // std::cout << label << " "; + // } + // } + // std::cout << "name: "; + // auto property = it.property("name").value(); + // std::cout << property << " "; + // std::cout<Dump().has_error()); ASSERT(graph_info->Save(save_path + graph_name + ".graph.yml").ok()); diff --git a/cpp/src/graphar/arrow/chunk_reader.cc b/cpp/src/graphar/arrow/chunk_reader.cc index f4add8a40..649c0a651 100644 --- a/cpp/src/graphar/arrow/chunk_reader.cc +++ b/cpp/src/graphar/arrow/chunk_reader.cc @@ -17,11 +17,12 @@ * under the License. */ +#include #include #include "arrow/api.h" #include "arrow/compute/api.h" - +#include "chunk_reader.h" #include "graphar/arrow/chunk_reader.h" #include "graphar/filesystem.h" #include "graphar/general_params.h" @@ -31,7 +32,6 @@ #include "graphar/status.h" #include "graphar/types.h" #include "graphar/util.h" - namespace graphar { namespace { @@ -51,6 +51,18 @@ Result> PropertyGroupToSchema( return arrow::schema(fields); } +Result> LabelToSchema( + std::vector labels, bool contain_index_column = false) { + std::vector> fields; + if (contain_index_column) { + fields.push_back(std::make_shared( + GeneralParams::kVertexIndexCol, arrow::int64())); + } + for (const auto& lab : labels) { + fields.push_back(std::make_shared(lab, arrow::boolean())); + } + return arrow::schema(fields); +} Status GeneralCast(const std::shared_ptr& in, const std::shared_ptr& to_type, std::shared_ptr* out) { @@ -148,6 +160,30 @@ VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader( PropertyGroupToSchema(property_group_, true)); } +// initialize for labels +VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader( + const std::shared_ptr& vertex_info, + const std::vector& labels, const std::string& prefix, + const util::FilterOptions& options) + : vertex_info_(std::move(vertex_info)), + labels_(labels), + chunk_index_(0), + seek_id_(0), + schema_(nullptr), + chunk_table_(nullptr), + filter_options_(options) { + GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); + // GAR_ASSIGN_OR_RAISE_ERROR(auto pg_path_prefix, + // vertex_info->GetPathPrefix(property_group)); + std::string base_dir = prefix_ + vertex_info_->GetPrefix() + "labels/chunk" + + std::to_string(chunk_index_); + GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_, + util::GetVertexChunkNum(prefix_, vertex_info)); + GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_, + util::GetVertexNum(prefix_, vertex_info_)); + GAR_ASSIGN_OR_RAISE_ERROR(schema_, LabelToSchema(labels)); +} + Status VertexPropertyArrowChunkReader::seek(IdType id) { seek_id_ = id; IdType pre_chunk_index = chunk_index_; @@ -186,6 +222,33 @@ VertexPropertyArrowChunkReader::GetChunk() { return chunk_table_->Slice(row_offset); } +Result> +VertexPropertyArrowChunkReader::GetLabelChunk() { + // GAR_RETURN_NOT_OK(util::CheckFilterOptions(filter_options_, + // property_group_)); + FileType filetype = FileType::PARQUET; + // TODO(elssky): rebuild GetFilePath for label chunks + if (chunk_table_ == nullptr) { + // GAR_ASSIGN_OR_RAISE( + // auto chunk_file_path, + // vertex_info_->GetFilePath(property_group_, chunk_index_)); + std::string path = prefix_ + vertex_info_->GetPrefix() + "labels/chunk" + + std::to_string(chunk_index_); + // std::string path = prefix_ + chunk_file_path; + GAR_ASSIGN_OR_RAISE(chunk_table_, + fs_->ReadFileToTable(path, filetype, filter_options_)); + // TODO(acezen): filter pushdown doesn't support cast schema now + // if (schema_ != nullptr && filter_options_.filter == nullptr) { + // GAR_RETURN_NOT_OK( + // CastTableWithSchema(chunk_table_, schema_, &chunk_table_)); + // } + } + // std::cout<column(0)->type()->ToString()<column(1)->type()->ToString()<GetChunkSize(); + return chunk_table_->Slice(row_offset); +} + Status VertexPropertyArrowChunkReader::next_chunk() { if (++chunk_index_ >= chunk_num_) { return Status::IndexError( @@ -215,6 +278,16 @@ VertexPropertyArrowChunkReader::Make( vertex_info, property_group, prefix, options); } +// Make for labels +Result> +VertexPropertyArrowChunkReader::Make( + const std::shared_ptr& vertex_info, + const std::vector& labels, const std::string& prefix, + const util::FilterOptions& options) { + return std::make_shared(vertex_info, labels, + prefix, options); +} + Result> VertexPropertyArrowChunkReader::Make( const std::shared_ptr& graph_info, const std::string& type, diff --git a/cpp/src/graphar/arrow/chunk_reader.h b/cpp/src/graphar/arrow/chunk_reader.h index b204753bd..882a4ee64 100644 --- a/cpp/src/graphar/arrow/chunk_reader.h +++ b/cpp/src/graphar/arrow/chunk_reader.h @@ -53,6 +53,20 @@ class VertexPropertyArrowChunkReader { const std::shared_ptr& property_group, const std::string& prefix, const util::FilterOptions& options = {}); + // 添加无参构造函数 + VertexPropertyArrowChunkReader() : vertex_info_(nullptr), prefix_("") {} + + /** + * @brief Initialize the VertexPropertyArrowChunkReader. + * + * @param vertex_info The vertex info that describes the vertex type. + * @param labels The labels of the vertex type. + * @param prefix The absolute prefix. + */ + VertexPropertyArrowChunkReader(const std::shared_ptr& vertex_info, + const std::vector& labels, + const std::string& prefix, + const util::FilterOptions& options = {}); /** * @brief Sets chunk position indicator for reader by internal vertex id. * If internal vertex id is not found, will return Status::IndexError @@ -67,12 +81,17 @@ class VertexPropertyArrowChunkReader { * @brief Return the current arrow chunk table of chunk position indicator. */ Result> GetChunk(); - + /** + * @brief Return the current arrow label chunk table of chunk position + * indicator. + */ + Result> GetLabelChunk(); /** * @brief Sets chunk position indicator to next chunk. * * if current chunk is the last chunk, will return Status::IndexError error. */ + Status next_chunk(); /** @@ -109,6 +128,19 @@ class VertexPropertyArrowChunkReader { const std::shared_ptr& property_group, const std::string& prefix, const util::FilterOptions& options = {}); + /** + * @brief Create a VertexPropertyArrowChunkReader instance from vertex info + * for labels. + * + * @param vertex_info The vertex info. + * @param prefix The absolute prefix of the graph. + * @param options The filter options, default is empty. + */ + static Result> Make( + const std::shared_ptr& vertex_info, + const std::vector& labels, const std::string& prefix, + const util::FilterOptions& options); + /** * @brief Create a VertexPropertyArrowChunkReader instance from graph info and * property group. @@ -142,6 +174,7 @@ class VertexPropertyArrowChunkReader { std::shared_ptr vertex_info_; std::shared_ptr property_group_; std::string prefix_; + std::vector labels_; IdType chunk_index_; IdType seek_id_; IdType chunk_num_; diff --git a/cpp/src/graphar/arrow/chunk_writer.cc b/cpp/src/graphar/arrow/chunk_writer.cc index 2e3e73539..272a5713e 100644 --- a/cpp/src/graphar/arrow/chunk_writer.cc +++ b/cpp/src/graphar/arrow/chunk_writer.cc @@ -33,6 +33,7 @@ #include "arrow/dataset/plan.h" #include "arrow/dataset/scanner.h" +#include "chunk_writer.h" #include "graphar/arrow/chunk_writer.h" #include "graphar/filesystem.h" #include "graphar/general_params.h" @@ -251,6 +252,23 @@ Status VertexPropertyWriter::WriteChunk( return Status::OK(); } +Status VertexPropertyWriter::WriteLabelChunk( + const std::shared_ptr& input_table, IdType chunk_index, + FileType file_type, ValidateLevel validate_level) const { + auto schema = input_table->schema(); + std::vector indices; + for (int i = 0; i < schema->num_fields(); i++) { + indices.push_back(i); + } + + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto in_table, + input_table->SelectColumns(indices)); + std::string suffix = + vertex_info_->GetPrefix() + "labels/chunk" + std::to_string(chunk_index); + std::string path = prefix_ + suffix; + return fs_->WriteTableToFile(input_table, file_type, path); +} + Status VertexPropertyWriter::WriteTable( const std::shared_ptr& input_table, const std::shared_ptr& property_group, @@ -287,9 +305,116 @@ Status VertexPropertyWriter::WriteTable( GAR_RETURN_NOT_OK(WriteTable(table_with_index, property_group, start_chunk_index, validate_level)); } + auto labels = vertex_info_->GetLabels(); + if (!labels.empty()) { + GAR_ASSIGN_OR_RAISE(auto label_table, GetLabelTable(input_table, labels)) + // WARNING!!! WARNING!!! WARNING!!! This is using for experiments + // GAR_ASSIGN_OR_RAISE(auto label_table, GetLabelTableAndRandomlyAddLabels + // (input_table, labels)) + GAR_RETURN_NOT_OK(WriteLabelTable(label_table, start_chunk_index, + FileType::PARQUET, validate_level)); + }; + return Status::OK(); } +// Helper function to split a string by a delimiter +std::vector SplitString(const std::string& str, char delimiter) { + std::vector tokens; + std::string token; + std::istringstream tokenStream(str); + while (std::getline(tokenStream, token, delimiter)) { + tokens.push_back(token); + } + return tokens; +} + +Status VertexPropertyWriter::WriteLabelTable( + const std::shared_ptr& input_table, IdType start_chunk_index, + FileType file_type, ValidateLevel validate_level) const { + auto schema = input_table->schema(); + int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol); + IdType chunk_size = vertex_info_->GetChunkSize(); + int64_t length = input_table->num_rows(); + IdType chunk_index = start_chunk_index; + for (int64_t offset = 0; offset < length; + offset += chunk_size, chunk_index++) { + auto in_chunk = input_table->Slice(offset, chunk_size); + GAR_RETURN_NOT_OK( + WriteLabelChunk(in_chunk, chunk_index, file_type, validate_level)); + } + return Status::OK(); +} + +Result> VertexPropertyWriter::GetLabelTable( + const std::shared_ptr& input_table, + const std::vector& labels) const { + // Find the label column index + auto label_col_idx = + input_table->schema()->GetFieldIndex(GeneralParams::kLabelCol); + if (label_col_idx == -1) { + return Status::KeyError("label column not found in the input table."); + } + + // Create a matrix of booleans with dimensions [number of rows, number of + // labels] + std::vector> bool_matrix( + input_table->num_rows(), std::vector(labels.size(), false)); + + // Create a map for labels to column indices + std::unordered_map label_to_index; + for (size_t i = 0; i < labels.size(); ++i) { + label_to_index[labels[i]] = i; + } + + int row_offset = 0; // Offset for where to fill the bool_matrix + // Iterate through each chunk of the :LABEL column + for (int64_t chunk_idx = 0; + chunk_idx < input_table->column(label_col_idx)->num_chunks(); + ++chunk_idx) { + auto chunk = input_table->column(label_col_idx)->chunk(chunk_idx); + auto label_column = std::static_pointer_cast(chunk); + + // Populate the matrix based on :LABEL column values + for (int64_t row = 0; row < label_column->length(); ++row) { + if (label_column->IsValid(row)) { + std::string labels_string = label_column->GetString(row); + auto row_labels = SplitString(labels_string, ';'); + for (const auto& lbl : row_labels) { + if (label_to_index.find(lbl) != label_to_index.end()) { + bool_matrix[row_offset + row][label_to_index[lbl]] = true; + } + } + } + } + + row_offset += + label_column->length(); // Update the row offset for the next chunk + } + + // Create Arrow arrays for each label column + arrow::FieldVector fields; + arrow::ArrayVector arrays; + + for (const auto& label : labels) { + arrow::BooleanBuilder builder; + for (const auto& row : bool_matrix) { + builder.Append(row[label_to_index[label]]); + } + + std::shared_ptr array; + builder.Finish(&array); + fields.push_back(arrow::field(label, arrow::boolean())); + arrays.push_back(array); + } + + // Create the Arrow Table with the boolean columns + auto schema = std::make_shared(fields); + auto result_table = arrow::Table::Make(schema, arrays); + + return result_table; +} + Result> VertexPropertyWriter::Make( const std::shared_ptr& vertex_info, const std::string& prefix, const ValidateLevel& validate_level) { diff --git a/cpp/src/graphar/arrow/chunk_writer.h b/cpp/src/graphar/arrow/chunk_writer.h index 65a8813ef..23c7f415f 100644 --- a/cpp/src/graphar/arrow/chunk_writer.h +++ b/cpp/src/graphar/arrow/chunk_writer.h @@ -117,6 +117,21 @@ class VertexPropertyWriter { const std::shared_ptr& property_group, IdType chunk_index, ValidateLevel validate_level = ValidateLevel::default_validate) const; + /** + * @brief Write all labels of a single vertex chunk + * to corresponding files. + * + * @param input_table The table containing data. + * @param chunk_index The index of the vertex chunk. + * @param validate_level The validate level for this operation, + * which is the writer's validate level by default. + * @return Status: ok or error. + */ + Status WriteLabelChunk( + const std::shared_ptr& input_table, IdType chunk_index, + FileType file_type, + ValidateLevel validate_level = ValidateLevel::default_validate) const; + /** * @brief Write all property groups of a single vertex chunk * to corresponding files. @@ -163,6 +178,35 @@ class VertexPropertyWriter { IdType start_chunk_index, ValidateLevel validate_level = ValidateLevel::default_validate) const; + /** + * @brief Write all labels for multiple vertex chunks + * to corresponding files. + * + * @param input_table The table containing data. + * @param start_chunk_index The start index of the vertex chunks. + * @param validate_level The validate level for this operation, + * which is the writer's validate level by default. + * @return Status: ok or error. + */ + Status WriteLabelTable( + const std::shared_ptr& input_table, + IdType start_chunk_index, FileType file_type, + ValidateLevel validate_level = ValidateLevel::default_validate) const; + + /** + * @brief Get label column from table to formulate label table + * @param input_table The table containing data. + * @param labels The labels. + * @return The table only containing label columns + * */ + Result> GetLabelTable( + const std::shared_ptr& input_table, + const std::vector& labels) const; + + Result> GetLabelTableAndRandomlyAddLabels( + const std::shared_ptr& input_table, + const std::vector& labels) const; + /** * @brief Construct a VertexPropertyWriter from vertex info. * diff --git a/cpp/src/graphar/filesystem.cc b/cpp/src/graphar/filesystem.cc index 62b9ab2e8..7802517e5 100644 --- a/cpp/src/graphar/filesystem.cc +++ b/cpp/src/graphar/filesystem.cc @@ -32,10 +32,10 @@ #include "parquet/arrow/writer.h" #include "simple-uri-parser/uri_parser.h" +#include #include "graphar/expression.h" #include "graphar/filesystem.h" #include "graphar/fwd.h" - namespace graphar::detail { template static Status CastToLargeOffsetArray( @@ -131,6 +131,7 @@ Result> FileSystem::ReadFileToTable( // some places, e.g., in vineyard for (int i = 0; i < table->num_columns(); ++i) { std::shared_ptr type = table->column(i)->type(); + // std::cout<< type->ToString() <id() == arrow::Type::STRING) { type = arrow::large_utf8(); } else if (type->id() == arrow::Type::BINARY) { @@ -233,8 +234,14 @@ Status FileSystem::WriteTableToFile(const std::shared_ptr& table, break; } case FileType::PARQUET: { + auto schema = table->schema(); + auto column_num = schema->num_fields(); parquet::WriterProperties::Builder builder; - builder.compression(arrow::Compression::type::ZSTD); // enable compression + // builder.compression(arrow::Compression::type::ZSTD); // enable + // compression + for (int i = 0; i < column_num; ++i) { + builder.encoding(schema->field(i)->name(), parquet::Encoding::RLE); + } RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable( *table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024, builder.build(), parquet::default_arrow_writer_properties())); diff --git a/cpp/src/graphar/fwd.h b/cpp/src/graphar/fwd.h index 0c0190399..0ee92384b 100644 --- a/cpp/src/graphar/fwd.h +++ b/cpp/src/graphar/fwd.h @@ -133,7 +133,8 @@ std::shared_ptr CreateAdjacentList( */ std::shared_ptr CreateVertexInfo( const std::string& type, IdType chunk_size, - const PropertyGroupVector& property_groups, const std::string& prefix = "", + const PropertyGroupVector& property_groups, + const std::vector& labels = {}, const std::string& prefix = "", std::shared_ptr version = nullptr); /** @@ -167,6 +168,7 @@ std::shared_ptr CreateEdgeInfo( * @param name The name of the graph * @param vertex_infos The vertex info vector of the graph * @param edge_infos The edge info vector of the graph + * @param labels The vertex labels of the graph. * @param prefix The absolute path prefix to store chunk files of the graph. * Defaults to "./" * @param version The version of the graph info @@ -175,7 +177,8 @@ std::shared_ptr CreateEdgeInfo( */ std::shared_ptr CreateGraphInfo( const std::string& name, const VertexInfoVector& vertex_infos, - const EdgeInfoVector& edge_infos, const std::string& prefix, + const EdgeInfoVector& edge_infos, const std::vector& labels, + const std::string& prefix, std::shared_ptr version = nullptr, const std::unordered_map& extra_info = {}); @@ -208,4 +211,4 @@ struct FilterOptions; using Filter = std::shared_ptr; using ColumnNames = std::optional>>; -} // namespace graphar::util +} // namespace graphar::util \ No newline at end of file diff --git a/cpp/src/graphar/general_params.h b/cpp/src/graphar/general_params.h index 55db90fb9..da9ed7acf 100644 --- a/cpp/src/graphar/general_params.h +++ b/cpp/src/graphar/general_params.h @@ -27,6 +27,7 @@ struct GeneralParams { static constexpr const char* kDstIndexCol = "_graphArDstIndex"; static constexpr const char* kOffsetCol = "_graphArOffset"; static constexpr const char* kPrimaryCol = "_graphArPrimary"; + static constexpr const char* kLabelCol = ":LABEL"; }; } // namespace graphar diff --git a/cpp/src/graphar/graph_info.cc b/cpp/src/graphar/graph_info.cc index 8c8aa3c20..81d1bba70 100644 --- a/cpp/src/graphar/graph_info.cc +++ b/cpp/src/graphar/graph_info.cc @@ -191,10 +191,12 @@ class VertexInfo::Impl { public: Impl(const std::string& type, IdType chunk_size, const std::string& prefix, const PropertyGroupVector& property_groups, + const std::vector& labels, std::shared_ptr version) : type_(type), chunk_size_(chunk_size), property_groups_(std::move(property_groups)), + labels_(labels), prefix_(prefix), version_(std::move(version)) { if (prefix_.empty()) { @@ -241,6 +243,7 @@ class VertexInfo::Impl { std::string type_; IdType chunk_size_; PropertyGroupVector property_groups_; + std::vector labels_; std::string prefix_; std::shared_ptr version_; std::unordered_map property_name_to_index_; @@ -252,9 +255,11 @@ class VertexInfo::Impl { VertexInfo::VertexInfo(const std::string& type, IdType chunk_size, const PropertyGroupVector& property_groups, + const std::vector& labels, const std::string& prefix, std::shared_ptr version) - : impl_(new Impl(type, chunk_size, prefix, property_groups, version)) {} + : impl_(new Impl(type, chunk_size, prefix, property_groups, labels, + version)) {} VertexInfo::~VertexInfo() = default; @@ -264,6 +269,10 @@ IdType VertexInfo::GetChunkSize() const { return impl_->chunk_size_; } const std::string& VertexInfo::GetPrefix() const { return impl_->prefix_; } +const std::vector& VertexInfo::GetLabels() const { + return impl_->labels_; +} + const std::shared_ptr& VertexInfo::version() const { return impl_->version_; } @@ -367,21 +376,22 @@ Result> VertexInfo::AddPropertyGroup( } return std::make_shared( impl_->type_, impl_->chunk_size_, - AddVectorElement(impl_->property_groups_, property_group), impl_->prefix_, - impl_->version_); + AddVectorElement(impl_->property_groups_, property_group), impl_->labels_, + impl_->prefix_, impl_->version_); } bool VertexInfo::IsValidated() const { return impl_->is_validated(); } std::shared_ptr CreateVertexInfo( const std::string& type, IdType chunk_size, - const PropertyGroupVector& property_groups, const std::string& prefix, + const PropertyGroupVector& property_groups, + const std::vector& labels, const std::string& prefix, std::shared_ptr version) { if (type.empty() || chunk_size <= 0) { return nullptr; } - return std::make_shared(type, chunk_size, property_groups, prefix, - version); + return std::make_shared(type, chunk_size, property_groups, labels, + prefix, version); } Result> VertexInfo::Load( @@ -396,6 +406,13 @@ Result> VertexInfo::Load( if (!yaml->operator[]("prefix").IsNone()) { prefix = yaml->operator[]("prefix").As(); } + std::vector labels; + const auto& labels_node = yaml->operator[]("labels"); + if (labels_node.IsSequence()) { + for (auto it = labels_node.Begin(); it != labels_node.End(); it++) { + labels.push_back((*it).second.As()); + } + } std::shared_ptr version = nullptr; if (!yaml->operator[]("version").IsNone()) { GAR_ASSIGN_OR_RAISE( @@ -430,8 +447,8 @@ Result> VertexInfo::Load( std::make_shared(property_vec, file_type, pg_prefix)); } } - return std::make_shared(type, chunk_size, property_groups, prefix, - version); + return std::make_shared(type, chunk_size, property_groups, labels, + prefix, version); } Result> VertexInfo::Load(const std::string& input) { @@ -449,6 +466,13 @@ Result VertexInfo::Dump() const noexcept { node["type"] = impl_->type_; node["chunk_size"] = std::to_string(impl_->chunk_size_); node["prefix"] = impl_->prefix_; + if (impl_->labels_.size() > 0) { + node["labels"]; + for (const auto& label : impl_->labels_) { + node["labels"].PushBack(); + node["labels"][node["labels"].Size() - 1] = label; + } + } for (const auto& pg : impl_->property_groups_) { ::Yaml::Node pg_node; if (!pg->GetPrefix().empty()) { @@ -1042,8 +1066,18 @@ static Result> ConstructGraphInfo( edge_infos.push_back(edge_info); } } - return std::make_shared(name, vertex_infos, edge_infos, prefix, - version, extra_info); + + std::vector labels; + if (!graph_meta->operator[]("labels").IsNone()) { + const auto& labels_node = graph_meta->operator[]("labels"); + if (labels_node.IsSequence()) { + for (auto it = labels_node.Begin(); it != labels_node.End(); it++) { + labels.push_back((*it).second.As()); + } + } + } + return std::make_shared(name, vertex_infos, edge_infos, labels, + prefix, version, extra_info); } } // namespace @@ -1051,12 +1085,13 @@ static Result> ConstructGraphInfo( class GraphInfo::Impl { public: Impl(const std::string& graph_name, VertexInfoVector vertex_infos, - EdgeInfoVector edge_infos, const std::string& prefix, - std::shared_ptr version, + EdgeInfoVector edge_infos, const std::vector& labels, + const std::string& prefix, std::shared_ptr version, const std::unordered_map& extra_info) : name_(graph_name), vertex_infos_(std::move(vertex_infos)), edge_infos_(std::move(edge_infos)), + labels_(labels), prefix_(prefix), version_(std::move(version)), extra_info_(extra_info) { @@ -1099,6 +1134,7 @@ class GraphInfo::Impl { std::string name_; VertexInfoVector vertex_infos_; EdgeInfoVector edge_infos_; + std::vector labels_; std::string prefix_; std::shared_ptr version_; std::unordered_map extra_info_; @@ -1108,16 +1144,20 @@ class GraphInfo::Impl { GraphInfo::GraphInfo( const std::string& graph_name, VertexInfoVector vertex_infos, - EdgeInfoVector edge_infos, const std::string& prefix, - std::shared_ptr version, + EdgeInfoVector edge_infos, const std::vector& labels, + const std::string& prefix, std::shared_ptr version, const std::unordered_map& extra_info) : impl_(new Impl(graph_name, std::move(vertex_infos), std::move(edge_infos), - prefix, version, extra_info)) {} + labels, prefix, version, extra_info)) {} GraphInfo::~GraphInfo() = default; const std::string& GraphInfo::GetName() const { return impl_->name_; } +const std::vector& GraphInfo::GetLabels() const { + return impl_->labels_; +} + const std::string& GraphInfo::GetPrefix() const { return impl_->prefix_; } const std::shared_ptr& GraphInfo::version() const { @@ -1196,7 +1236,7 @@ Result> GraphInfo::AddVertex( } return std::make_shared( impl_->name_, AddVectorElement(impl_->vertex_infos_, vertex_info), - impl_->edge_infos_, impl_->prefix_, impl_->version_); + impl_->edge_infos_, impl_->labels_, impl_->prefix_, impl_->version_); } Result> GraphInfo::AddEdge( @@ -1210,20 +1250,20 @@ Result> GraphInfo::AddEdge( } return std::make_shared( impl_->name_, impl_->vertex_infos_, - AddVectorElement(impl_->edge_infos_, edge_info), impl_->prefix_, - impl_->version_); + AddVectorElement(impl_->edge_infos_, edge_info), impl_->labels_, + impl_->prefix_, impl_->version_); } std::shared_ptr CreateGraphInfo( const std::string& name, const VertexInfoVector& vertex_infos, - const EdgeInfoVector& edge_infos, const std::string& prefix, - std::shared_ptr version, + const EdgeInfoVector& edge_infos, const std::vector& labels, + const std::string& prefix, std::shared_ptr version, const std::unordered_map& extra_info) { if (name.empty()) { return nullptr; } - return std::make_shared(name, vertex_infos, edge_infos, prefix, - version, extra_info); + return std::make_shared(name, vertex_infos, edge_infos, labels, + prefix, version, extra_info); } Result> GraphInfo::Load(const std::string& path) { @@ -1275,6 +1315,13 @@ Result GraphInfo::Dump() const { edge->GetDstType()) + ".edge.yaml"; } + if (impl_->labels_.size() > 0) { + node["labels"]; + for (const auto& label : impl_->labels_) { + node["labels"].PushBack(); + node["labels"][node["labels"].Size() - 1] = label; + } + } if (impl_->version_ != nullptr) { node["version"] = impl_->version_->ToString(); } diff --git a/cpp/src/graphar/graph_info.h b/cpp/src/graphar/graph_info.h index f6d086966..e3aee616b 100644 --- a/cpp/src/graphar/graph_info.h +++ b/cpp/src/graphar/graph_info.h @@ -180,12 +180,14 @@ class VertexInfo { * @param type The type of the vertex. * @param chunk_size The number of vertices in each vertex chunk. * @param property_groups The property group vector of the vertex. + * @param labels The labels of the vertex. * @param prefix The prefix of the vertex info. If left empty, the default * prefix will be set to the type of the vertex. * @param version The format version of the vertex info. */ explicit VertexInfo(const std::string& type, IdType chunk_size, const PropertyGroupVector& property_groups, + const std::vector& labels = {}, const std::string& prefix = "", std::shared_ptr version = nullptr); @@ -227,6 +229,12 @@ class VertexInfo { */ const std::shared_ptr& version() const; + /** + * Get the labels of the vertex. + * @return The labels of the vertex. + */ + const std::vector& GetLabels() const; + /** * Get the number of property groups of the vertex. * @@ -694,6 +702,7 @@ class GraphInfo { * @param graph_name The name of the graph. * @param vertex_infos The vertex info vector of the graph. * @param edge_infos The edge info vector of the graph. + * @param labels The vertex labels of the graph. * @param prefix The absolute path prefix to store chunk files of the graph. * Defaults to "./". * @param version The version of the graph info. @@ -701,7 +710,8 @@ class GraphInfo { */ explicit GraphInfo( const std::string& graph_name, VertexInfoVector vertex_infos, - EdgeInfoVector edge_infos, const std::string& prefix = "./", + EdgeInfoVector edge_infos, const std::vector& labels = {}, + const std::string& prefix = "./", std::shared_ptr version = nullptr, const std::unordered_map& extra_info = {}); @@ -753,6 +763,12 @@ class GraphInfo { */ const std::string& GetName() const; + /** + * @brief Get the vertex labels of the graph. + * @return The vertex labels of the graph. + */ + const std::vector& GetLabels() const; + /** * @brief Get the absolute path prefix of the chunk files. * @return The absolute path prefix of the chunk files. @@ -873,4 +889,4 @@ class GraphInfo { std::unique_ptr impl_; }; -} // namespace graphar +} // namespace graphar \ No newline at end of file diff --git a/cpp/src/graphar/high-level/graph_reader.cc b/cpp/src/graphar/high-level/graph_reader.cc index 9d9857d3c..ffc9188ff 100644 --- a/cpp/src/graphar/high-level/graph_reader.cc +++ b/cpp/src/graphar/high-level/graph_reader.cc @@ -18,6 +18,10 @@ */ #include "graphar/high-level/graph_reader.h" +#include "../label.h" +#include "arrow/array.h" +#include "graph_reader.h" +#include "graphar/api/arrow_reader.h" #include "graphar/convert_to_arrow_type.h" #include "graphar/types.h" @@ -94,6 +98,284 @@ Vertex::Vertex(IdType id, } } +Result VertexIter::label(const std::string& label) noexcept { + std::shared_ptr column(nullptr); + label_reader_.seek(cur_offset_); + GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk()); + column = util::GetArrowColumnByName(chunk_table, label); + if (column != nullptr) { + auto array = util::GetArrowArrayByChunkIndex(column, 0); + auto bool_array = std::dynamic_pointer_cast(array); + return bool_array->Value(0); + } + return Status::KeyError("label with name ", label, + " does not exist in the vertex."); +} + +Result> VertexIter::label() noexcept { + std::shared_ptr column(nullptr); + std::vector vertex_label; + if (is_filtered_) + label_reader_.seek(filtered_ids_[cur_offset_]); + else + label_reader_.seek(cur_offset_); + GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk()); + for (auto label : labels_) { + column = util::GetArrowColumnByName(chunk_table, label); + if (column != nullptr) { + auto array = util::GetArrowArrayByChunkIndex(column, 0); + auto bool_array = std::dynamic_pointer_cast(array); + if (bool_array->Value(0)) { + vertex_label.push_back(label); + } + } + } + return vertex_label; +} + +static inline bool IsValid(bool* state, int column_number) { + for (int i = 0; i < column_number; ++i) { + // AND case + if (!state[i]) + return false; + // OR case + // if (state[i]) return true; + } + // AND case + return true; + // OR case + // return false; +} + +Result> VerticesCollection::filter( + std::vector filter_labels) const { + std::vector indices; + const int TOT_ROWS_NUM = vertex_num_; + const int CHUNK_SIZE = vertex_info_->GetChunkSize(); + const int TOT_LABEL_NUM = labels_.size(); + const int TESTED_LABEL_NUM = filter_labels.size(); + std::vector tested_label_ids; + + for (const auto& filter_label : filter_labels) { + auto it = std::find(labels_.begin(), labels_.end(), filter_label); + if (it != labels_.end()) { + tested_label_ids.push_back(std::distance(labels_.begin(), it)); + } + } + + uint64_t* bitmap = new uint64_t[TOT_ROWS_NUM / 64 + 1]; + memset(bitmap, 0, sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1)); + int total_count = 0; + int row_num; + for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) { + row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE); + std::string new_filename = + prefix_ + vertex_info_->GetPrefix() + "labels/chunk"; + int count = read_parquet_file_and_get_valid_indices( + new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM, + tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap, + QUERY_TYPE::INDEX); + total_count += count; + } + // std::cout << "Total valid count: " << total_count << std::endl; + std::vector indices64; + + for (int value : indices) { + indices64.push_back(static_cast(value)); + } + + return indices64; +} + +Result> VerticesCollection::filter_by_acero( + std::vector filter_labels) const { + std::vector indices; + const int TOT_ROWS_NUM = vertex_num_; + const int CHUNK_SIZE = vertex_info_->GetChunkSize(); + const int TOT_LABEL_NUM = labels_.size(); + const int TESTED_LABEL_NUM = filter_labels.size(); + std::vector tested_label_ids; + for (const auto& filter_label : filter_labels) { + auto it = std::find(labels_.begin(), labels_.end(), filter_label); + if (it != labels_.end()) { + tested_label_ids.push_back(std::distance(labels_.begin(), it)); + } + } + int total_count = 0; + int row_num; + std::vector> filters; + std::shared_ptr combined_filter = nullptr; + + for (const auto& label : filter_labels) { + filters.emplace_back( + graphar::_Equal(graphar::_Property(label), graphar::_Literal(true))); + } + + for (const auto& filter : filters) { + if (!combined_filter) { + combined_filter = graphar::_And(filter, filter); + } else { + combined_filter = graphar::_And(combined_filter, filter); + } + } + + std::string new_filename = + prefix_ + vertex_info_->GetPrefix() + "labels/chunk"; + auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make( + vertex_info_, labels_, prefix_, {}); + auto filter_reader = maybe_filter_reader.value(); + for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) { + filter_reader->Filter(combined_filter); + auto filter_result = filter_reader->GetLabelChunk(); + auto filter_table = filter_result.value(); + total_count += filter_table->num_rows(); + filter_reader->next_chunk(); + } + // std::cout << "Total valid count: " << total_count << std::endl; + std::vector indices64; + + for (int value : indices) { + indices64.push_back(static_cast(value)); + } + + return indices64; +} + +Result> +VerticesCollection::verticesWithLabel( + const std::string& filter_label, + const std::shared_ptr& graph_info, const std::string& type) { + auto prefix = graph_info->GetPrefix(); + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + auto vertices_collection = + std::make_shared(vertex_info, prefix); + vertices_collection->filtered_ids_ = + vertices_collection->filter({filter_label}).value(); + vertices_collection->is_filtered_ = true; + return vertices_collection; +} + +Result> +VerticesCollection::verticesWithLabelbyAcero( + const std::string& filter_label, + const std::shared_ptr& graph_info, const std::string& type) { + auto prefix = graph_info->GetPrefix(); + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + auto vertices_collection = + std::make_shared(vertex_info, prefix); + vertices_collection->filtered_ids_ = + vertices_collection->filter_by_acero({filter_label}).value(); + vertices_collection->is_filtered_ = true; + return vertices_collection; +} + +Result> +VerticesCollection::verticesWithLabel( + const std::string& filter_label, + const std::shared_ptr& vertices_collection) { + auto filtered_ids = vertices_collection->filter({filter_label}).value(); + if (vertices_collection->is_filtered_) { + std::unordered_set origin_set( + vertices_collection->filtered_ids_.begin(), + vertices_collection->filtered_ids_.end()); + std::unordered_set intersection; + for (int num : filtered_ids) { + if (origin_set.count(num)) { + intersection.insert(num); + } + } + filtered_ids = + std::vector(intersection.begin(), intersection.end()); + } + return std::make_shared(vertices_collection->vertex_info_, + vertices_collection->prefix_, + true, filtered_ids); +} + +Result> +VerticesCollection::verticesWithMultipleLabels( + const std::vector& filter_labels, + const std::shared_ptr& graph_info, const std::string& type) { + auto prefix = graph_info->GetPrefix(); + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + auto vertices_collection = + std::make_shared(vertex_info, prefix); + vertices_collection->filtered_ids_ = + vertices_collection->filter(filter_labels).value(); + vertices_collection->is_filtered_ = true; + return vertices_collection; +} + +Result> +VerticesCollection::verticesWithMultipleLabelsbyAcero( + const std::vector& filter_labels, + const std::shared_ptr& graph_info, const std::string& type) { + auto prefix = graph_info->GetPrefix(); + auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); + auto vertices_collection = + std::make_shared(vertex_info, prefix); + vertices_collection->filtered_ids_ = + vertices_collection->filter_by_acero(filter_labels).value(); + vertices_collection->is_filtered_ = true; + return vertices_collection; +} + +Result> +VerticesCollection::verticesWithMultipleLabels( + const std::vector& filter_labels, + const std::shared_ptr& vertices_collection) { + auto filtered_ids = vertices_collection->filter(filter_labels).value(); + if (vertices_collection->is_filtered_) { + std::unordered_set origin_set( + vertices_collection->filtered_ids_.begin(), + vertices_collection->filtered_ids_.end()); + std::unordered_set intersection; + for (int num : filtered_ids) { + if (origin_set.count(num)) { + intersection.insert(num); + } + } + filtered_ids = + std::vector(intersection.begin(), intersection.end()); + } + return std::make_shared(vertices_collection->vertex_info_, + vertices_collection->prefix_, + true, filtered_ids); +} + +// Result> +// VerticesCollection::verticesWithLabelAndProperty( +// const std::string& filter_label, +// const std::vector>& filter_properties, +// const std::vector& filter_properties_val, +// const std::shared_ptr& graph_info, +// const std::string& type) { +// auto prefix = graph_info->GetPrefix(); +// auto vertex_info = graph_info->GetVertexInfo(type); +// auto labels = vertex_info->GetLabels(); +// auto vertices_collection = +// std::make_shared(vertex_info, prefix); auto +// filtered_ids = vertices_collection->filter({filter_label}); +// std::vector new_filtered_ids; +// for (auto it = vertices_collection->begin(); it != +// vertices_collection->end(); ++it) { +// for(auto pg : filter_properties) { +// if(it.property<>(pg->name) != filter_properties_val) { +// continue; +// } +// } +// new_filtered_ids.push_back(it.id()); +// } + +// return std::make_shared(vertex_info, prefix,true, +// filtered_ids.value()); + +// } + template Result Vertex::property(const std::string& property) const { if constexpr (std::is_final::value) { diff --git a/cpp/src/graphar/high-level/graph_reader.h b/cpp/src/graphar/high-level/graph_reader.h index 20e7d8f8d..cb96ef539 100644 --- a/cpp/src/graphar/high-level/graph_reader.h +++ b/cpp/src/graphar/high-level/graph_reader.h @@ -20,6 +20,7 @@ #pragma once #include +#include #include #include #include @@ -74,6 +75,13 @@ class Vertex { template Result property(const std::string& property) const; + /** + * @brief Get the label of the vertex. + * @return Result: The label of the vertex. + */ + template + Result label() const; + /** * @brief Return true if value at the property is valid (not null). * @@ -172,40 +180,79 @@ class VertexIter { * @param offset The current offset of the readers. */ explicit VertexIter(const std::shared_ptr& vertex_info, - const std::string& prefix, IdType offset) noexcept { + const std::string& prefix, IdType offset, + const std::vector& labels, + const bool& is_filtered = false, + const std::vector& filtered_ids = {}) noexcept { + if (!labels.empty()) { + labels_ = labels; + label_reader_ = + VertexPropertyArrowChunkReader(vertex_info, labels, prefix); + } for (const auto& pg : vertex_info->GetPropertyGroups()) { readers_.emplace_back(vertex_info, pg, prefix); } + is_filtered_ = is_filtered; + filtered_ids_ = filtered_ids; cur_offset_ = offset; } /** Copy constructor. */ VertexIter(const VertexIter& other) - : readers_(other.readers_), cur_offset_(other.cur_offset_) {} + : readers_(other.readers_), + cur_offset_(other.cur_offset_), + labels_(other.labels_), + label_reader_(other.label_reader_), + is_filtered_(other.is_filtered_), + filtered_ids_(other.filtered_ids_) {} /** Construct and return the vertex of the current offset. */ Vertex operator*() noexcept { - for (auto& reader : readers_) { - reader.seek(cur_offset_); + if (is_filtered_) { + for (auto& reader : readers_) { + reader.seek(filtered_ids_[cur_offset_]); + } + } else { + for (auto& reader : readers_) { + reader.seek(cur_offset_); + } } + return Vertex(cur_offset_, readers_); } /** Get the vertex id of the current offset. */ - IdType id() { return cur_offset_; } + IdType id() { + if (is_filtered_) { + return filtered_ids_[cur_offset_]; + } else + return cur_offset_; + } /** Get the value for a property of the current vertex. */ template Result property(const std::string& property) noexcept { std::shared_ptr column(nullptr); - for (auto& reader : readers_) { - reader.seek(cur_offset_); - GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk()); - column = util::GetArrowColumnByName(chunk_table, property); - if (column != nullptr) { - break; + if (is_filtered_) { + for (auto& reader : readers_) { + reader.seek(filtered_ids_[cur_offset_]); + GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk()); + column = util::GetArrowColumnByName(chunk_table, property); + if (column != nullptr) { + break; + } + } + } else { + for (auto& reader : readers_) { + reader.seek(cur_offset_); + GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk()); + column = util::GetArrowColumnByName(chunk_table, property); + if (column != nullptr) { + break; + } } } + if (column != nullptr) { auto array = util::GetArrowArrayByChunkIndex(column, 0); GAR_ASSIGN_OR_RAISE(auto data, util::GetArrowArrayData(array)); @@ -215,6 +262,11 @@ class VertexIter { " does not exist in the vertex."); } + Result label(const std::string& label) noexcept; + + /** Get the labels of the current vertex. */ + Result> label() noexcept; + /** The prefix increment operator. */ VertexIter& operator++() noexcept { ++cur_offset_; @@ -253,7 +305,11 @@ class VertexIter { private: std::vector readers_; + VertexPropertyArrowChunkReader label_reader_; + std::vector labels_; IdType cur_offset_; + bool is_filtered_; + std::vector filtered_ids_; }; /** @@ -266,11 +322,18 @@ class VerticesCollection { * @brief Initialize the VerticesCollection. * * @param vertex_info The vertex info that describes the vertex type. + * @param labels The labels of the vertex. * @param prefix The absolute prefix. */ explicit VerticesCollection(const std::shared_ptr& vertex_info, - const std::string& prefix) - : vertex_info_(std::move(vertex_info)), prefix_(prefix) { + const std::string& prefix, + const bool is_filtered = false, + const std::vector filtered_ids = {}) + : vertex_info_(std::move(vertex_info)), + prefix_(prefix), + labels_(vertex_info->GetLabels()), + is_filtered_(is_filtered), + filtered_ids_(filtered_ids) { // get the vertex num std::string base_dir; GAR_ASSIGN_OR_RAISE_ERROR(auto fs, @@ -283,21 +346,122 @@ class VerticesCollection { } /** The iterator pointing to the first vertex. */ - VertexIter begin() noexcept { return VertexIter(vertex_info_, prefix_, 0); } + VertexIter begin() noexcept { + return VertexIter(vertex_info_, prefix_, 0, labels_, is_filtered_, + filtered_ids_); + } /** The iterator pointing to the past-the-end element. */ VertexIter end() noexcept { - return VertexIter(vertex_info_, prefix_, vertex_num_); + if (is_filtered_) + return VertexIter(vertex_info_, prefix_, filtered_ids_.size(), labels_, + is_filtered_, filtered_ids_); + return VertexIter(vertex_info_, prefix_, vertex_num_, labels_, is_filtered_, + filtered_ids_); } /** The iterator pointing to the vertex with specific id. */ - VertexIter find(IdType id) { return VertexIter(vertex_info_, prefix_, id); } + VertexIter find(IdType id) { + return VertexIter(vertex_info_, prefix_, id, labels_); + } /** Get the number of vertices in the collection. */ - size_t size() const noexcept { return vertex_num_; } + size_t size() const noexcept { + if (is_filtered_) + return filtered_ids_.size(); + else + return vertex_num_; + } + + /** The vertex id list that satisfies the label filter condition. */ + Result> filter( + std::vector filter_labels) const; + + Result> filter_by_acero( + std::vector filter_labels) const; + + /** + * @brief Query vertices with a specific label + * + * @param filter_label The label to query vertices by + * @param graph_info A smart pointer to GraphInfo that contains details about + * the graph + * @param type The type of vertices to query + * @return A VerticesCollection containing all vertices that have the + * specified label + */ + static Result> verticesWithLabel( + const std::string& filter_label, + const std::shared_ptr& graph_info, const std::string& type); + + static Result> verticesWithLabelbyAcero( + const std::string& filter_label, + const std::shared_ptr& graph_info, const std::string& type); /** - * @brief Construct a VerticesCollection from graph info and vertex type. + * @brief Query vertices with a specific label within a given collection + * + * @param filter_label The label to query vertices by + * @param vertices_collection The collection of vertices to search within + * @return A VerticesCollection containing all vertices from the specified + * collection that have the specified label + */ + static Result> verticesWithLabel( + const std::string& filter_label, + const std::shared_ptr& vertices_collection); + + /** + * @brief Query vertices with multiple labels + * + * @param filter_labels A vector of labels to query vertices by + * @param graph_info A smart pointer to GraphInfo that contains details about + * the graph + * @param type The type of vertices to query + * @return A VerticesCollection containing all vertices that have all of the + * specified labels + */ + static Result> verticesWithMultipleLabels( + const std::vector& filter_labels, + const std::shared_ptr& graph_info, const std::string& type); + + static Result> + verticesWithMultipleLabelsbyAcero( + const std::vector& filter_labels, + const std::shared_ptr& graph_info, const std::string& type); + + /** + * @brief Query vertices with multiple labels within a given collection + * + * @param filter_labels A vector of labels to query vertices by + * @param vertices_collection The collection of vertices to search within + * @return A VerticesCollection containing all vertices from the specified + * collection that have all of the specified labels + */ + static Result> verticesWithMultipleLabels( + const std::vector& filter_labels, + const std::shared_ptr& vertices_collection); + + /** + * @brief Query vertices with a specific label and properties + * + * @param filter_label The label to query vertices by + * @param properties A vector of shared pointers to Property objects that the + * vertices should match + * @param graph_info A smart pointer to GraphInfo that contains details about + * the graph + * @param type The type of vertices to query + * @return A VerticesCollection containing all vertices that have the + * specified label and match the given properties + */ + static Result> + verticesWithLabelAndProperty( + const std::string& filter_label, + const std::vector>& filter_properties, + const std::vector& filter_properties_val, + const std::shared_ptr& graph_info, const std::string& type); + + /** + * @brief Construct a VerticesCollection from graph info and vertex label. * * @param graph_info The graph info. * @param type The vertex type. @@ -305,6 +469,7 @@ class VerticesCollection { static Result> Make( const std::shared_ptr& graph_info, const std::string& type) { auto vertex_info = graph_info->GetVertexInfo(type); + auto labels = vertex_info->GetLabels(); if (!vertex_info) { return Status::KeyError("The vertex ", type, " doesn't exist."); } @@ -315,6 +480,9 @@ class VerticesCollection { private: std::shared_ptr vertex_info_; std::string prefix_; + std::vector labels_; + bool is_filtered_; + std::vector filtered_ids_; IdType vertex_num_; }; @@ -710,6 +878,16 @@ class EdgesCollection { * @param src_type The source vertex type. * @param edge_type The edge type. * @param dst_type The destination vertex type. +<<<<<<< HEAD + * @param src_type The source vertex type. + * @param edge_type The edge type. + * @param dst_type The destination vertex type. +======= + * @param src_type The source vertex label. + * @param edge_type The edge label. + * @param dst_type The destination vertex label. +>>>>>>> bbd562e (feat!(format):change the naming of vertex/edge unique identify +type from `label` to `type` (#605)) * @param adj_list_type The type of adjList. * @param vertex_chunk_begin The index of the begin vertex chunk, default 0. * @param vertex_chunk_end The index of the end vertex chunk (not included), diff --git a/cpp/src/graphar/label.cc b/cpp/src/graphar/label.cc new file mode 100644 index 000000000..0027d1649 --- /dev/null +++ b/cpp/src/graphar/label.cc @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "label.h" + +#include +#include +#include +#include +#include +#include + +/// Read a parquet file by ParquetReader & get valid indices +/// The first column_num labels are concerned. +int read_parquet_file_and_get_valid_indices( + const char* parquet_filename, const int row_num, const int tot_label_num, + const int tested_label_num, std::vector tested_label_ids, + const std::function& IsValid, int chunk_idx, + int chunk_size, std::vector* indices, uint64_t* bitmap, + const QUERY_TYPE query_type) { + /* std::cout << "Reading a bool plain/RLE encoded parquet file: " << + parquet_filename + << std::endl + << "row_num = " << row_num << ", tot_label_num = " << tot_label_num + << ", tested_label_num = " << tested_label_num << std::endl; */ + + // Create a ParquetReader instance + std::unique_ptr parquet_reader = + parquet::ParquetFileReader::OpenFile( + parquet_filename + std::to_string(chunk_idx), false); + + // Get the File MetaData + std::shared_ptr file_metadata = + parquet_reader->metadata(); + int row_group_count = file_metadata->num_row_groups(); + int num_columns = file_metadata->num_columns(); + + // Initialize the column row counts + std::vector col_row_counts(num_columns, 0); + bool** value = new bool*[num_columns]; + for (int i = 0; i < num_columns; i++) { + value[i] = new bool[row_num]; + } + + // Iterate over all the RowGroups in the file + for (int rg = 0; rg < row_group_count; ++rg) { + // Get the RowGroup Reader + std::shared_ptr row_group_reader = + parquet_reader->RowGroup(rg); + + int64_t values_read = 0; + int64_t rows_read = 0; + std::shared_ptr column_reader; + + ARROW_UNUSED(rows_read); // prevent warning in release build + + // Read the label columns + for (int k = 0; k < tested_label_num; k++) { + int col_id = tested_label_ids[k]; + // Get the Column Reader for the Bool column + column_reader = row_group_reader->Column(col_id); + parquet::BoolReader* bool_reader = + static_cast(column_reader.get()); + // Read all the rows in the column + while (bool_reader->HasNext()) { + // Read BATCH_SIZE values at a time. The number of rows read is + // returned. values_read contains the number of non-null rows + + rows_read = bool_reader->ReadBatch(BATCH_SIZE, nullptr, nullptr, + value[k] + col_row_counts[col_id], + &values_read); + + // There are no NULL values in the rows written + col_row_counts[col_id] += rows_read; + } + } + } + + bool state[tot_label_num]; + int count = 0; + int offset = chunk_idx * chunk_size; + for (int i = 0; i < row_num; i++) { + for (int j = 0; j < tested_label_num; j++) { + state[j] = value[j][i]; + } + if (IsValid(state, tested_label_num)) { + count++; + if (query_type == QUERY_TYPE::INDEX) + + indices->push_back(i + offset); + else if (query_type == QUERY_TYPE::BITMAP) + SetBitmap(bitmap, i); + } + } + + // destroy the allocated space + for (int i = 0; i < num_columns; i++) { + delete[] value[i]; + } + delete[] value; + + // std::cout << "The parquet file is read successfully!" << std::endl << + // std::endl; return the valid count + return count; +} diff --git a/cpp/src/graphar/label.h b/cpp/src/graphar/label.h new file mode 100644 index 000000000..df79dc84b --- /dev/null +++ b/cpp/src/graphar/label.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef PARQUET_EXAMPLES_GRAPHAR_LABEL_H +#define PARQUET_EXAMPLES_GRAPHAR_LABEL_H + +#include +#include +#include +#include +#include + +#include +#include +#include + +using parquet::ConvertedType; +using parquet::Encoding; +using parquet::Repetition; +using parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::PrimitiveNode; + +constexpr int BATCH_SIZE = 1024; // the batch size + +/// The query type +enum QUERY_TYPE { + COUNT, // return the number of valid vertices + INDEX, // return the indices of valid vertices + BITMAP, // return the bitmap of valid vertices + ADAPTIVE // adaptively return indices or bitmap +}; + +/// Set bit +static inline void SetBitmap(uint64_t* bitmap, const int index) { + bitmap[index >> 6] |= (1ULL << (index & 63)); +} + +/// Set bit in a range +static inline void SetBitmap(uint64_t* bitmap, const int start, const int end) { + int pos1 = start >> 6, pos2 = end >> 6; + if (pos1 == pos2) { + bitmap[pos1] |= (1ULL << (end & 63)) - (1ULL << (start & 63)); + } else { + bitmap[pos1] |= ~((1ULL << (start & 63)) - 1); + bitmap[pos2] |= (1ULL << (end & 63)) - 1; + for (int i = pos1 + 1; i < pos2; ++i) { + bitmap[i] = ~0ULL; + } + } +} + +/// Get bit +static inline bool GetBit(const uint64_t* bitmap, const int index) { + return (bitmap[index >> 6]) & (1ULL << (index & 63)); +} + +int read_parquet_file_and_get_valid_indices( + const char* parquet_filename, const int row_num, const int tot_label_num, + const int tested_label_num, std::vector tested_label_ids, + const std::function& IsValid, int chunk_idx, + int chunk_size, std::vector* indices = nullptr, + uint64_t* bitmap = nullptr, const QUERY_TYPE query_type = COUNT); + +#endif // LABEL_H \ No newline at end of file diff --git a/cpp/src/graphar/util.cc b/cpp/src/graphar/util.cc index e477188d6..7a3b1a829 100644 --- a/cpp/src/graphar/util.cc +++ b/cpp/src/graphar/util.cc @@ -86,6 +86,9 @@ Result GetArrowArrayData( } else if (array->type()->Equals(arrow::null())) { return reinterpret_cast( std::dynamic_pointer_cast(array).get()); + } else if (array->type()->Equals(arrow::boolean())) { + return reinterpret_cast( + std::dynamic_pointer_cast(array).get()); } else { return Status::TypeError("Array type - ", array->type()->ToString(), " is not supported yet..."); diff --git a/cpp/test/test_info.cc b/cpp/test/test_info.cc index c0349ee03..82d839e3a 100644 --- a/cpp/test/test_info.cc +++ b/cpp/test/test_info.cc @@ -183,7 +183,7 @@ TEST_CASE_METHOD(GlobalFixture, "VertexInfo") { {Property("p0", int32(), true), Property("p1", string(), false)}, FileType::CSV, "p0_p1/"); auto vertex_info = - CreateVertexInfo(type, chunk_size, {pg}, "test_vertex", version); + CreateVertexInfo(type, chunk_size, {pg}, {}, "test_vertex", version); SECTION("Basics") { REQUIRE(vertex_info->GetType() == type); @@ -224,24 +224,25 @@ TEST_CASE_METHOD(GlobalFixture, "VertexInfo") { auto invalid_pg = CreatePropertyGroup({Property("p0", nullptr, true)}, FileType::CSV); auto invalid_vertex_info0 = CreateVertexInfo(type, chunk_size, {invalid_pg}, - "test_vertex/", version); + {}, "test_vertex/", version); REQUIRE(invalid_vertex_info0->IsValidated() == false); - VertexInfo invalid_vertex_info1("", chunk_size, {pg}, "test_vertex/", + VertexInfo invalid_vertex_info1("", chunk_size, {pg}, {}, "test_vertex/", version); REQUIRE(invalid_vertex_info1.IsValidated() == false); - VertexInfo invalid_vertex_info2(type, 0, {pg}, "test_vertex/", version); + VertexInfo invalid_vertex_info2(type, 0, {pg}, {}, "test_vertex/", version); REQUIRE(invalid_vertex_info2.IsValidated() == false); // check if prefix empty auto vertex_info_empty_prefix = - CreateVertexInfo(type, chunk_size, {pg}, "", version); + CreateVertexInfo(type, chunk_size, {pg}, {}, "", version); REQUIRE(vertex_info_empty_prefix->IsValidated() == true); } SECTION("CreateVertexInfo") { - auto vertex_info3 = CreateVertexInfo("", chunk_size, {pg}, "test_vertex/"); + auto vertex_info3 = + CreateVertexInfo("", chunk_size, {pg}, {}, "test_vertex/"); REQUIRE(vertex_info3 == nullptr); - auto vertex_info4 = CreateVertexInfo(type, 0, {pg}, "test_vertex/"); + auto vertex_info4 = CreateVertexInfo(type, 0, {pg}, {}, "test_vertex/"); REQUIRE(vertex_info4 == nullptr); } @@ -267,7 +268,7 @@ version: gar/v1 )"; REQUIRE(dump_result.value() == expected); auto vertex_info_empty_version = - CreateVertexInfo(type, chunk_size, {pg}, "test_vertex/"); + CreateVertexInfo(type, chunk_size, {pg}, {}, "test_vertex/"); REQUIRE(vertex_info_empty_version->Dump().status().ok()); } @@ -521,7 +522,7 @@ TEST_CASE_METHOD(GlobalFixture, "GraphInfo") { {Property("p0", int32(), true), Property("p1", string(), false)}, FileType::CSV, "p0_p1/"); auto vertex_info = - CreateVertexInfo("test_vertex", 100, {pg}, "test_vertex/", version); + CreateVertexInfo("test_vertex", 100, {pg}, {}, "test_vertex/", version); std::unordered_map extra_info = { {"category", "test graph"}}; auto edge_info = @@ -529,7 +530,7 @@ TEST_CASE_METHOD(GlobalFixture, "GraphInfo") { {CreateAdjacentList(AdjListType::ordered_by_source, FileType::CSV, "adj_list/")}, {pg}, "test_edge/", version); - auto graph_info = CreateGraphInfo(name, {vertex_info}, {edge_info}, + auto graph_info = CreateGraphInfo(name, {vertex_info}, {edge_info}, {}, "test_graph/", version, extra_info); SECTION("Basics") { @@ -544,7 +545,7 @@ TEST_CASE_METHOD(GlobalFixture, "GraphInfo") { SECTION("ExtraInfo") { auto graph_info_with_extra_info = - CreateGraphInfo(name, {vertex_info}, {edge_info}, "test_graph/", + CreateGraphInfo(name, {vertex_info}, {edge_info}, {}, "test_graph/", version, {{"key1", "value1"}, {"key2", "value2"}}); const auto& extra_info = graph_info_with_extra_info->GetExtraInfo(); REQUIRE(extra_info.size() == 2); @@ -580,9 +581,9 @@ TEST_CASE_METHOD(GlobalFixture, "GraphInfo") { SECTION("IsValidated") { REQUIRE(graph_info->IsValidated() == true); auto invalid_vertex_info = - CreateVertexInfo("", 100, {pg}, "test_vertex/", version); + CreateVertexInfo("", 100, {pg}, {}, "test_vertex/", version); auto invalid_graph_info0 = CreateGraphInfo( - name, {invalid_vertex_info}, {edge_info}, "test_graph/", version); + name, {invalid_vertex_info}, {edge_info}, {}, "test_graph/", version); REQUIRE(invalid_graph_info0->IsValidated() == false); auto invalid_edge_info = CreateEdgeInfo("", "knows", "person", 1024, 100, 100, true, @@ -590,23 +591,23 @@ TEST_CASE_METHOD(GlobalFixture, "GraphInfo") { FileType::CSV, "adj_list/")}, {pg}, "test_edge/", version); auto invalid_graph_info1 = CreateGraphInfo( - name, {vertex_info}, {invalid_edge_info}, "test_graph/", version); + name, {vertex_info}, {invalid_edge_info}, {}, "test_graph/", version); REQUIRE(invalid_graph_info1->IsValidated() == false); - GraphInfo invalid_graph_info2("", {vertex_info}, {edge_info}, "test_graph/", - version); + GraphInfo invalid_graph_info2("", {vertex_info}, {edge_info}, {}, + "test_graph/", version); REQUIRE(invalid_graph_info2.IsValidated() == false); - GraphInfo invalid_graph_info3(name, {vertex_info}, {edge_info}, "", + GraphInfo invalid_graph_info3(name, {vertex_info}, {edge_info}, {}, "", version); REQUIRE(invalid_graph_info3.IsValidated() == false); // check if prefix empty, graph_info with empty prefix is invalid auto graph_info_with_empty_prefix = - CreateGraphInfo(name, {vertex_info}, {edge_info}, "", version); + CreateGraphInfo(name, {vertex_info}, {edge_info}, {}, "", version); REQUIRE(graph_info_with_empty_prefix->IsValidated() == false); } SECTION("CreateGraphInfo") { auto graph_info_empty_name = - CreateGraphInfo("", {vertex_info}, {edge_info}, "test_graph/"); + CreateGraphInfo("", {vertex_info}, {edge_info}, {}, "test_graph/"); REQUIRE(graph_info_empty_name == nullptr); } @@ -626,7 +627,7 @@ version: gar/v1 )"; REQUIRE(dump_result.value() == expected); auto graph_info_empty_version = - CreateGraphInfo(name, {vertex_info}, {edge_info}, "test_graph/"); + CreateGraphInfo(name, {vertex_info}, {edge_info}, {}, "test_graph/"); REQUIRE(graph_info_empty_version->Dump().status().ok()); } @@ -638,8 +639,8 @@ version: gar/v1 } SECTION("AddVertex") { - auto vertex_info2 = - CreateVertexInfo("test_vertex2", 100, {pg}, "test_vertex2/", version); + auto vertex_info2 = CreateVertexInfo("test_vertex2", 100, {pg}, {}, + "test_vertex2/", version); auto maybe_extend_info = graph_info->AddVertex(vertex_info2); REQUIRE(maybe_extend_info.status().ok()); auto extend_info = maybe_extend_info.value(); @@ -778,25 +779,25 @@ version: gar/v1 /* TODO(acezen): need to mock S3 server to test this case, this private service is not available for public access. - -TEST_CASE_METHOD(GlobalFixture, "LoadFromS3") { - // explicitly call InitS3 to initialize S3 APIs before using - // S3 file system. - InitializeS3(); - std::string path = - "s3://graphscope/graphar/ldbc/ldbc.graph.yml" - "?endpoint_override=graphscope.oss-cn-beijing.aliyuncs.com"; - auto graph_info_result = GraphInfo::Load(path); - std::cout << graph_info_result.status().message() << std::endl; - REQUIRE(!graph_info_result.has_error()); - auto graph_info = graph_info_result.value(); - REQUIRE(graph_info->GetName() == "ldbc"); - const auto& vertex_infos = graph_info->GetVertexInfos(); - const auto& edge_infos = graph_info->GetEdgeInfos(); - REQUIRE(vertex_infos.size() == 8); - REQUIRE(edge_infos.size() == 23); - // explicitly call FinalizeS3 to avoid memory leak - FinalizeS3(); -} */ +// TEST_CASE_METHOD(GlobalFixture, "LoadFromS3") { +// // explicitly call InitS3 to initialize S3 APIs before using +// // S3 file system. +// InitializeS3(); +// std::string path = +// "s3://graphar/ldbc/ldbc.graph.yml" +// "?endpoint_override=graphscope.oss-cn-beijing.aliyuncs.com"; +// auto graph_info_result = GraphInfo::Load(path); +// std::cout << graph_info_result.status().message() << std::endl; +// REQUIRE(!graph_info_result.has_error()); +// auto graph_info = graph_info_result.value(); +// REQUIRE(graph_info->GetName() == "ldbc"); +// const auto& vertex_infos = graph_info->GetVertexInfos(); +// const auto& edge_infos = graph_info->GetEdgeInfos(); +// REQUIRE(vertex_infos.size() == 8); +// REQUIRE(edge_infos.size() == 23); +// // explicitly call FinalizeS3 to avoid memory leak +// FinalizeS3(); +// } + } // namespace graphar diff --git a/cpp/test/test_multi_label.cc b/cpp/test/test_multi_label.cc new file mode 100644 index 000000000..a4f7952de --- /dev/null +++ b/cpp/test/test_multi_label.cc @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/csv/api.h" +#include "arrow/filesystem/api.h" +#include "arrow/io/api.h" +#include "arrow/stl.h" +#include "arrow/util/uri.h" +#include "parquet/arrow/writer.h" + +#include "./util.h" +#include "graphar/api/high_level_writer.h" + +#include + +std::shared_ptr read_csv_to_table(const std::string& filename) { + arrow::csv::ReadOptions read_options{}; + arrow::csv::ParseOptions parse_options{}; + arrow::csv::ConvertOptions convert_options{}; + + parse_options.delimiter = '|'; + + auto input = + arrow::io::ReadableFile::Open(filename, arrow::default_memory_pool()) + .ValueOrDie(); + + auto reader = arrow::csv::TableReader::Make(arrow::io::default_io_context(), + input, read_options, + parse_options, convert_options) + .ValueOrDie(); + + std::shared_ptr table; + table = reader->Read().ValueOrDie(); + + return table; +} + +namespace graphar { +TEST_CASE_METHOD(GlobalFixture, "test_multi_label_builder") { + std::cout << "Test multi label builder" << std::endl; + + // construct graph information from file + // std::string path = + // test_data_dir + "/icij/parquet/" + "icij-offshoreleaks.graph.yml"; + std::string path = test_data_dir + "/ldbc/parquet/" + "ldbc.graph.yml"; + auto graph_info = graphar::GraphInfo::Load(path).value(); + auto vertex_info = graph_info->GetVertexInfo("organisation"); + + // std::vector labels = graph_info->GetLabels(); + auto labels = vertex_info->GetLabels(); + + std::unordered_map code; + + std::vector> label_column_data; + + // read labels csv file as arrow table + auto table = + read_csv_to_table(test_data_dir + "/ldbc/modified_organisation_0_0.csv"); + // auto table = read_csv_to_table(test_data_dir + + // "/ldbc_large/modified_organisation_0_0.csv"); + std::string table_message = table->ToString(); + + auto schema = table->schema(); + std::cout << schema->ToString() << std::endl; + // std::cout << table_message << std::endl; + + // write arrow table as chunk parquet + auto maybe_writer = VertexPropertyWriter::Make( + vertex_info, + "/workspaces/incubator-graphar/cpp/build/testing/ldbc/parquet/"); + REQUIRE(!maybe_writer.has_error()); + auto writer = maybe_writer.value(); + REQUIRE(writer->WriteTable(table, 0).ok()); + REQUIRE(writer->WriteVerticesNum(table->num_rows()).ok()); +} +} // namespace graphar diff --git a/docs/libraries/cpp/examples/bgl.md b/docs/libraries/cpp/examples/bgl.md index 3b4ffee7c..669386c8f 100644 --- a/docs/libraries/cpp/examples/bgl.md +++ b/docs/libraries/cpp/examples/bgl.md @@ -86,9 +86,9 @@ std::vector property_vector = {cc}; auto group = graphar::CreatePropertyGroup(property_vector, graphar::FileType::PARQUET); // construct the new vertex info -std::string vertex_label = "cc_result", vertex_prefix = "result/"; +std::string vertex_type = "cc_result", vertex_prefix = "result/"; int chunk_size = 100; -auto new_info = graphar::CreateVertexInfo(vertex_label, chunk_size, {group}, vertex_prefix); +auto new_info = graphar::CreateVertexInfo(vertex_type, chunk_size, {group}, vertex_prefix); // access the vertices via the index map and vertex iterator of BGL typedef boost::property_map::type IndexMap; diff --git a/docs/libraries/cpp/examples/snap-to-graphar.md b/docs/libraries/cpp/examples/snap-to-graphar.md index 13880ce71..1ae59c76b 100644 --- a/docs/libraries/cpp/examples/snap-to-graphar.md +++ b/docs/libraries/cpp/examples/snap-to-graphar.md @@ -32,11 +32,11 @@ storage of the vertex information file. auto version = graphar::InfoVersion::Parse("gar/v1").value(); // meta info -std::string vertex_label = "node", vertex_prefix = "vertex/node/"; +std::string vertex_type = "node", vertex_prefix = "vertex/node/"; // create vertex info auto vertex_info = graphar::CreateVertexInfo( - vertex_label, VERTEX_CHUNK_SIZE, {}, vertex_prefix, version); + vertex_type, VERTEX_CHUNK_SIZE, {}, vertex_prefix, version); // save & dump vertex info ASSERT(!vertex_info->Dump().has_error());