diff --git a/CMakeLists.txt b/CMakeLists.txt index 10c7b6f22b..94ecfe9c50 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.13) +cmake_minimum_required(VERSION 3.16) project(TuGraph C CXX) message("Community version.") diff --git a/src/BuildLGraphServer.cmake b/src/BuildLGraphServer.cmake index c3fcd3394f..17d022f976 100644 --- a/src/BuildLGraphServer.cmake +++ b/src/BuildLGraphServer.cmake @@ -51,6 +51,7 @@ add_library(${TARGET_SERVER_LIB} STATIC import/import_online.cpp import/import_v2.cpp import/import_v3.cpp + import/graphar_parser.cpp restful/server/rest_server.cpp restful/server/stdafx.cpp http/http_server.cpp @@ -72,6 +73,7 @@ if (NOT (CMAKE_SYSTEM_NAME STREQUAL "Darwin")) lgraph_cypher_lib geax_isogql bolt + gar # begin static linking -Wl,-Bstatic cpprest @@ -111,6 +113,7 @@ else () cpprest boost_thread boost_chrono + gar profiler snappy pthread diff --git a/src/import/block_parser.h b/src/import/block_parser.h new file mode 100644 index 0000000000..c1f2b26db5 --- /dev/null +++ b/src/import/block_parser.h @@ -0,0 +1,30 @@ +/** + * Copyright 2022 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#pragma once + +#include "core/data_type.h" + +namespace lgraph { +namespace import_v2 { + +// The base class for ColumnParser, JsonLinesParser and GraphArParser +class BlockParser { + public: + virtual bool ReadBlock(std::vector>& buf) = 0; + virtual ~BlockParser() {} +}; + +} // namespace import_v2 +} // namespace lgraph diff --git a/src/import/column_parser.h b/src/import/column_parser.h index d5b784de44..4ca240f132 100644 --- a/src/import/column_parser.h +++ b/src/import/column_parser.h @@ -18,22 +18,14 @@ #include "core/data_type.h" #include "core/field_data_helper.h" +#include "import/block_parser.h" #include "import/import_config_parser.h" #include "restful/server/json_convert.h" #include "tools/json.hpp" -#include -#include - namespace lgraph { namespace import_v2 { -class BlockParser { - public: - virtual bool ReadBlock(std::vector>& buf) = 0; - virtual ~BlockParser(){} -}; - /** Parse each line of a csv into a vector of FieldData, excluding SKIP columns. * vector specifies what each column contains. */ @@ -414,231 +406,5 @@ class ColumnParser : public BlockParser { } }; -class JsonLinesParser : public BlockParser { - public: - typedef std::function(const char*, const char*, - std::vector&)> - ParseFunc; - JsonLinesParser(std::unique_ptr stream, - const std::vector& field_specs, size_t block_size, size_t n_threads, - size_t n_header_lines, bool forgiving, int64_t max_err_msgs = 100) - : stream_(std::move(stream)), - field_specs_(field_specs), - forgiving_(forgiving), - max_errors_(max_err_msgs) { - init(block_size, n_threads, n_header_lines); - } - JsonLinesParser(const std::string& path, const std::vector& field_specs, - size_t block_size, size_t n_threads, size_t n_header_lines, bool forgiving, - int64_t max_err_msgs = 100) - : stream_(new fma_common::InputFmaStream(path)), - field_specs_(field_specs), - forgiving_(forgiving), - max_errors_(max_err_msgs) { - if (!stream_->Good()) { - LOG_INFO() << "Failed to open input file " << path; - throw std::runtime_error("failed to open input file [" + path + "]"); - } - init(block_size, n_threads, n_header_lines); - } - - ~JsonLinesParser() { parser_->Stop(); } - - bool ReadBlock(std::vector>& buf) { return parser_->ReadBlock(buf); } - - private: - void init(size_t block_size, size_t n_threads, size_t n_header_lines) { - parser_.reset(new fma_common::TextParser, ParseFunc>( - *stream_, - [this](const char* start, const char* end, std::vector& fds) { - return parse_jsonline(start, end, fds); - }, - block_size, n_threads, n_header_lines)); - } - - std::tuple parse_jsonline(const char* start, const char* end, - std::vector& fds) { - using namespace web; - using namespace boost; - size_t trim_count = 0; - const char* original_starting = start; - while (start < end && fma_common::TextParserUtils::IsTrimable(*start)) { - start++; - trim_count++; - } - if (start == end) { - return std::tuple(trim_count, false); - } - -#define SKIP_OR_THROW(except) \ - if (forgiving_) { \ - if (errors_++ < max_errors_) LOG_INFO() << except.what(); \ - while (start < end && !fma_common::TextParserUtils::IsNewLine(*start)) start++; \ - while (start < end && fma_common::TextParserUtils::IsNewLine(*start)) start++; \ - return std::tuple(start - original_starting, false); \ - } else { \ - std::throw_with_nested(except); \ - } - - // use stream parse to avoid memory copy - iostreams::stream istr(start, end - start); - std::error_code err_code; - json::value json_obj = json::value::parse(istr, err_code); - switch (err_code.value()) { - case 0: - break; - case 1: - { - istr.unget(); // hack - break; - } - default: - { - SKIP_OR_THROW(ParseJsonException(start, end, err_code.message())); - } - } - using namespace lgraph::field_data_helper; - try { - for (size_t column = 0; column < field_specs_.size(); column++) { - FieldSpec& field_spec = field_specs_[column]; - if (field_spec.name.empty()) { - continue; - } - if (json_obj.at(column).is_null() && field_spec.optional) { - fds.emplace_back(); - continue; - } - FieldData fd; - switch (field_spec.type) { - case FieldType::NUL: - FMA_ASSERT(false); - case FieldType::BOOL: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData(str.data(), - str.data() + str.size(), fd); - } else { - fd = FieldData::Bool(val.as_bool()); - } - break; - } - case FieldType::INT8: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData(str.data(), - str.data() + str.size(), fd); - } else { - fd = FieldData::Int8(val.as_number().to_int32()); - } - break; - } - case FieldType::INT16: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData(str.data(), - str.data() + str.size(), fd); - } else { - fd = FieldData::Int16(val.as_number().to_int32()); - } - break; - } - case FieldType::INT32: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData(str.data(), - str.data() + str.size(), fd); - } else { - fd = FieldData::Int32(val.as_number().to_int32()); - } - break; - } - case FieldType::INT64: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData(str.data(), - str.data() + str.size(), fd); - } else { - fd = FieldData::Int64(val.as_number().to_int64()); - } - break; - } - case FieldType::FLOAT: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData(str.data(), - str.data() + str.size(), fd); - } else { - fd = FieldData::Float(static_cast(val.as_double())); - } - break; - } - case FieldType::DOUBLE: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData( - str.data(), str.data() + str.size(), fd); - } else { - fd = FieldData::Double(val.as_double()); - } - break; - } - case FieldType::DATE: - fd = FieldData::Date(ToStdString(json_obj.at(column).as_string())); - break; - case FieldType::DATETIME: - fd = FieldData::DateTime(ToStdString(json_obj.at(column).as_string())); - break; - case FieldType::STRING: - fd = FieldData::String(ToStdString(json_obj.at(column).as_string())); - break; - case FieldType::BLOB: - fd = FieldData::Blob(ToStdString(json_obj.at(column).as_string())); - break; - case FieldType::POINT: - // TODO(shw): Support import for point type; - case FieldType::LINESTRING: - // TODO(shw): support import for linestring type; - case FieldType::POLYGON: - // TODO(shw): support import for polygon type; - case FieldType::SPATIAL: - // TODO(shw): support import for spatial type; - throw std::runtime_error("do not support spatial type now!"); - } - if (fd.is_null()) { - throw std::bad_cast(); - } - fds.emplace_back(std::move(fd)); - } - } catch (std::exception& e) { - SKIP_OR_THROW(JsonReadException(start, end, e.what())); - } catch (...) { - SKIP_OR_THROW(JsonReadException(start, end, "Unknown exception")); - } - return std::tuple(static_cast(istr.tellg()) + trim_count, true); - } - - std::unique_ptr stream_; - std::vector field_specs_; - std::unique_ptr, ParseFunc>> parser_; - bool forgiving_ = false; - int64_t errors_ = 0; - int64_t max_errors_ = 100; -#undef SKIP_OR_THROW -}; - } // namespace import_v2 } // namespace lgraph diff --git a/src/import/graphar_config.h b/src/import/graphar_config.h new file mode 100644 index 0000000000..84a4d2267f --- /dev/null +++ b/src/import/graphar_config.h @@ -0,0 +1,189 @@ +/** + * Copyright 2022 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#pragma once + +#include +#include +#include + +#include "tools/json.hpp" + +namespace lgraph { +namespace import_v3 { + +// Configuration of imported vertex or edge data properties field +typedef std::vector> Properties; + +/** + * @brief Parse the gar DataType to FieldType in config. + * + * @param[in] data_type The GraphAr DataType of the vetex or edge property. + * @param[out] type_name The FieldType string which used to make json object. + */ +inline void ParseType(const std::shared_ptr& data_type, + std::string& type_name) { + switch (data_type->id()) { + case GraphArchive::Type::BOOL: + type_name = "BOOL"; + break; + case GraphArchive::Type::INT32: + type_name = "INT32"; + break; + case GraphArchive::Type::INT64: + type_name = "INT64"; + break; + case GraphArchive::Type::FLOAT: + type_name = "FLOAT"; + break; + case GraphArchive::Type::DOUBLE: + type_name = "DOUBLE"; + break; + case GraphArchive::Type::STRING: + type_name = "STRING"; + break; + default: + THROW_CODE(InputError, "Unsupported data type error!"); + break; + } +} + +/** + * Traverse all properties of the vertex, get the primary key, the properties and the property + * names. Keep the original order in yml config. + * + * @param[in] ver_info The gar vertex information. + * @param[out] primary The primary key of the vertex. + * @param[out] props All the properties of the vertex. One of it maybe + * {"name":"id","type":"INT64"}. + * @param[out] prop_names All the property names of the vertex. One of it maybe "id". + */ +inline void WalkVertex(const GraphArchive::VertexInfo& ver_info, std::string& primary, + Properties& props, std::vector& prop_names) { + auto& ver_groups = ver_info.GetPropertyGroups(); + for (auto& ver_props : ver_groups) { + for (const auto& prop : ver_props->GetProperties()) { + if (prop.is_primary) primary = prop.name; + prop_names.emplace_back(std::move(prop.name)); + std::string type_name; + ParseType(prop.type, type_name); + props.emplace_back( + std::map{{"name", prop.name}, {"type", type_name}}); + } + } +} + +/** + * Traverse all properties of the edge, get the properties and the property names. + * Keep the original order in yml config. Similar to WalkVertex, but don't get primary. + * + * @param[in] edge_info The gar edge information. + * @param[out] props All the properties of the vertex. One of it maybe + * {"name":"id","type":"INT64"}. + * @param[out] prop_names All the property names of the vertex. One of it maybe "id". + */ +inline void WalkEdge(const GraphArchive::EdgeInfo& edge_info, Properties& props, + std::vector& prop_names) { + auto& edge_groups = edge_info.GetPropertyGroups(); + for (const auto& edge_props : edge_groups) { + for (const auto& prop : edge_props->GetProperties()) { + prop_names.emplace_back(std::move(prop.name)); + std::string type_name; + ParseType(prop.type, type_name); + props.emplace_back( + std::map{{"name", prop.name}, {"type", type_name}}); + } + } +} + +/** + * Compare the properties of two edges to see if they are equal. + * + * @param props1 The first edge properties. + * @param props2 The second edge properties. + * @return Result of the comparison. + */ +inline bool CheckEdgePropsEqual(const Properties& props1, const Properties& props2) { + json json1(props1); + json json2(props2); + return json1 == json2; +} + +/** + * @brief Read the gar yml file to construct the import config in json form. + * + * @param[out] gar_conf The json object of the import config used in import_v3. + * @param[in] path The location of gar yml file. + */ +inline void ParserGraphArConf(nlohmann::json& gar_conf, const std::string& path) { + auto graph_info = GraphArchive::GraphInfo::Load(path).value(); + gar_conf["schema"] = {}; + gar_conf["files"] = {}; + auto vertex_infos = graph_info->GetVertexInfos(); + for (const auto& vertex_info : vertex_infos) { + nlohmann::json schema_node; + schema_node["label"] = vertex_info->GetLabel(); + schema_node["type"] = "VERTEX"; + std::string primary; + Properties properties; + std::vector prop_names; + WalkVertex(*vertex_info, primary, properties, prop_names); + schema_node["primary"] = primary; + schema_node["properties"] = properties; + gar_conf["schema"].push_back(schema_node); + + nlohmann::json file_node; + file_node["path"] = path; + file_node["format"] = "GraphAr"; + file_node["label"] = vertex_info->GetLabel(); + file_node["columns"] = prop_names; + gar_conf["files"].push_back(file_node); + } + + auto edge_infos = graph_info->GetEdgeInfos(); + // The map of edge_label and its properties + std::unordered_map edge_labels; + for (const auto& edge_info : edge_infos) { + std::string label = edge_info->GetEdgeLabel(); + Properties properties; + std::vector prop_names = {"SRC_ID", "DST_ID"}; + WalkEdge(*edge_info, properties, prop_names); + if (!edge_labels.count(label)) { + edge_labels[label] = properties; + nlohmann::json schema_node; + schema_node["label"] = label; + schema_node["type"] = "EDGE"; + if (properties.size()) { + schema_node["properties"] = properties; + } + gar_conf["schema"].push_back(schema_node); + } else { + if (!CheckEdgePropsEqual(properties, edge_labels[label])) { + THROW_CODE(InputError, "The edge [" + label + "] has different properties!"); + } + } + + nlohmann::json file_node; + file_node["path"] = path; + file_node["format"] = "GraphAr"; + file_node["label"] = edge_info->GetEdgeLabel(); + file_node["SRC_ID"] = edge_info->GetSrcLabel(); + file_node["DST_ID"] = edge_info->GetDstLabel(); + file_node["columns"] = prop_names; + gar_conf["files"].push_back(file_node); + } +} + +} // namespace import_v3 +} // namespace lgraph diff --git a/src/import/graphar_parser.cpp b/src/import/graphar_parser.cpp new file mode 100644 index 0000000000..9aa3fd4d57 --- /dev/null +++ b/src/import/graphar_parser.cpp @@ -0,0 +1,179 @@ +/** + * Copyright 2022 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#include "import/graphar_parser.h" +#include "import/graphar_config.h" +#include "import/import_exception.h" + +namespace lgraph { +namespace import_v2 { + +// The map contains PrimaryMaps of all vertices as endpoints of edges +std::unordered_map GraphArParser::primary_maps = {}; + +GraphArParser::GraphArParser(const CsvDesc& cd) : cd_(cd) {} + +GraphArParser::~GraphArParser() {} + +/** + * @brief Traverse the vertex properties to get the primary key. + * @param ver_info The gar vertex information. + * @return The primary key of the vertex. + */ +GraphArchive::Property GraphArParser::GetPrimaryKey(const GraphArchive::VertexInfo& ver_info) { + const auto& ver_groups = ver_info.GetPropertyGroups(); + for (const auto& ver_props : ver_groups) + for (const auto& prop : ver_props->GetProperties()) + if (prop.is_primary) return prop; + THROW_CODE(InputError, "The primary key of [" + ver_info.GetLabel() + "] is not found!"); +} + +/** + * @brief Parse the gar data of a vertex or edge property to FieldData. + * @param T The GraphAr vertex data or edge data. + * @param prop The property name. + * @param data_type The GraphAr DataType of the vetex property. + * @return The converted FieldData. + */ +template +FieldData GraphArParser::ParseData(T& data, const std::string& prop, + const std::shared_ptr& type) { + switch (type->id()) { + case GraphArchive::Type::BOOL: + return FieldData(data.template property(prop).value()); + case GraphArchive::Type::INT32: + return FieldData(data.template property(prop).value()); + case GraphArchive::Type::INT64: + return FieldData(data.template property(prop).value()); + case GraphArchive::Type::FLOAT: + return FieldData(data.template property(prop).value()); + case GraphArchive::Type::DOUBLE: + return FieldData(data.template property(prop).value()); + case GraphArchive::Type::STRING: + return FieldData::String(data.template property(prop).value()); + default: + THROW_CODE(InputError, "Unsupported data type error!"); + break; + } + return FieldData(); +} + +/** + * Map the GraphAr vertex ID and the vertex primary property FieldData, to get the + * primary data. + * @param[out] primary_map To save the map. + * @param[in] graph_info The GraphAr graph information, to get the vertex information. + * @param[in] ver_label The vertex label to get the vertex information. + */ +void GraphArParser::MapIdPrimary(PrimaryMap& primary_map, + const std::shared_ptr& graph_info, + const std::string& ver_label) { + if (primary_maps.count(ver_label)) { + primary_map = primary_maps[ver_label]; + return; + } + auto ver_info = graph_info->GetVertexInfo(ver_label); + GraphArchive::Property ver_prop = GetPrimaryKey(*ver_info); + if (!ver_prop.is_primary) { + THROW_CODE(InputError, "the primary key of [" + ver_label + "] is not found!"); + } + auto vertices = GraphArchive::VerticesCollection::Make(graph_info, ver_label).value(); + for (auto it = vertices->begin(); it != vertices->end(); ++it) { + auto vertex = *it; + primary_map[vertex.id()] = + ParseData(vertex, ver_prop.name, ver_prop.type); + } + primary_maps[ver_label] = primary_map; +} + +/** + * @brief Get the adjacency list type of the edge + * + * @param edge_info An object to describe the edge information + * @return One of GraphArchive::AdjListType + */ +GraphArchive::AdjListType GraphArParser::GetOneAdjListType( + const GraphArchive::EdgeInfo& edge_info) { + if (edge_info.HasAdjacentListType(GraphArchive::AdjListType::unordered_by_source)) + return GraphArchive::AdjListType::unordered_by_source; + if (edge_info.HasAdjacentListType(GraphArchive::AdjListType::unordered_by_dest)) + return GraphArchive::AdjListType::unordered_by_dest; + if (edge_info.HasAdjacentListType(GraphArchive::AdjListType::ordered_by_source)) + return GraphArchive::AdjListType::ordered_by_source; + if (edge_info.HasAdjacentListType(GraphArchive::AdjListType::ordered_by_dest)) + return GraphArchive::AdjListType::ordered_by_dest; + THROW_CODE(InputError, "The GraphAr edge" + edge_info.GetEdgeLabel() + " has not AdjListType!"); +} + +/** + * Read the gar data via the CsvDesc, and parse the gar data to the block of FieldData. + * @param[out] buf The block import_v3 needed. + * @return Whether the read process has been finished. + */ +bool GraphArParser::ReadBlock(std::vector>& buf) { + if (label_read) return false; + label_read = true; + auto graph_info = GraphArchive::GraphInfo::Load(cd_.path).value(); + if (cd_.is_vertex_file) { + auto vertices = GraphArchive::VerticesCollection::Make(graph_info, cd_.label).value(); + auto ver_info = graph_info->GetVertexInfo(cd_.label); + for (auto it = vertices->begin(); it != vertices->end(); ++it) { + auto vertex = *it; + std::vector temp_vf; + for (std::string& prop : cd_.columns) { + auto data_type = ver_info->GetPropertyType(prop).value(); + FieldData fd = ParseData(vertex, prop, data_type); + temp_vf.emplace_back(std::move(fd)); + } + buf.emplace_back(std::move(temp_vf)); + } + return true; + } else { + auto edge_info = graph_info->GetEdgeInfo(cd_.edge_src.label, cd_.label, cd_.edge_dst.label); + auto edges = + GraphArchive::EdgesCollection::Make(graph_info, cd_.edge_src.label, cd_.label, + cd_.edge_dst.label, GetOneAdjListType(*edge_info)) + .value(); + + PrimaryMap src_id_primary; + PrimaryMap dst_id_primary; + MapIdPrimary(src_id_primary, graph_info, cd_.edge_src.label); + MapIdPrimary(dst_id_primary, graph_info, cd_.edge_dst.label); + + size_t edge_num = edges->size(); + buf.reserve(edge_num); + for (auto it = edges->begin(); it != edges->end(); ++it) { + auto edge = *it; + std::vector temp_vf; + for (auto& prop : cd_.columns) { + if (prop == "SRC_ID") { + FieldData src_data = src_id_primary[edge.source()]; + temp_vf.emplace_back(std::move(src_data)); + } else if (prop == "DST_ID") { + FieldData dst_data = dst_id_primary[edge.destination()]; + temp_vf.emplace_back(std::move(dst_data)); + } else { + auto data_type = edge_info->GetPropertyType(prop).value(); + FieldData edge_data = ParseData(edge, prop, data_type); + temp_vf.emplace_back(std::move(edge_data)); + } + } + buf.emplace_back(std::move(temp_vf)); + } + return true; + } +} + +} // namespace import_v2 +} // namespace lgraph diff --git a/src/import/graphar_parser.h b/src/import/graphar_parser.h new file mode 100644 index 0000000000..63cea2efd8 --- /dev/null +++ b/src/import/graphar_parser.h @@ -0,0 +1,58 @@ +/** + * Copyright 2022 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#pragma once + +#include +#include +#include + +#include "core/data_type.h" +#include "core/field_data_helper.h" +#include "import/block_parser.h" +#include "import/import_config_parser.h" + +namespace lgraph { +namespace import_v2 { + +// The map to get primary data of a vertex +typedef std::unordered_map PrimaryMap; + +// Parse gar file into a block of FieldData +class GraphArParser : public BlockParser { + protected: + CsvDesc cd_; // Schema definition and config + bool label_read = false; // Means the block has been read + + static std::unordered_map primary_maps; + + GraphArchive::Property GetPrimaryKey(const GraphArchive::VertexInfo&); + template + FieldData ParseData(T& data, const std::string& prop, + const std::shared_ptr& data_type); + void MapIdPrimary(PrimaryMap& primary_map, + const std::shared_ptr& graph_info, + const std::string& ver_label); + GraphArchive::AdjListType GetOneAdjListType(const GraphArchive::EdgeInfo &edge_info); + + public: + explicit GraphArParser(const CsvDesc& cd); + + bool ReadBlock(std::vector>& buf); + + ~GraphArParser(); +}; + +} // namespace import_v2 +} // namespace lgraph diff --git a/src/import/import_config_parser.h b/src/import/import_config_parser.h index b47eadd5fb..ef968fa4d8 100644 --- a/src/import/import_config_parser.h +++ b/src/import/import_config_parser.h @@ -958,9 +958,11 @@ class ImportConfParser { cd.size = fs::file_size(file); } cd.data_format = item["format"]; - if (cd.data_format != "CSV" && cd.data_format != "JSON") { + + if (cd.data_format != "CSV" && cd.data_format != "JSON" && + cd.data_format != "GraphAr") { THROW_CODE(InputError, - "\"format\" value error : {}, should be CSV or JSON in json {}", + "\"format\" value error : {}, should be CSV, JSON or GraphAr in json {}", cd.data_format, item.dump(4)); } cd.label = item["label"]; diff --git a/src/import/import_v2.h b/src/import/import_v2.h index 829bd7a335..a3141c6b9a 100644 --- a/src/import/import_v2.h +++ b/src/import/import_v2.h @@ -17,6 +17,7 @@ #include "core/lightning_graph.h" #include "core/type_convert.h" #include "import/column_parser.h" +#include "import/jsonlines_parser.h" #include "import/import_config_parser.h" #include "import/import_data_file.h" #include "import/import_planner.h" diff --git a/src/import/import_v3.cpp b/src/import/import_v3.cpp index 37db2add4c..d2e200cf42 100644 --- a/src/import/import_v3.cpp +++ b/src/import/import_v3.cpp @@ -20,6 +20,7 @@ #include "import/import_config_parser.h" #include "import/blob_writer.h" #include "import/import_utils.h" +#include "import/graphar_config.h" #include "db/galaxy.h" namespace lgraph { @@ -40,7 +41,7 @@ void Importer::OnErrorOffline(const std::string& msg) { LOG_WARN() << msg; if (!config_.continue_on_error) { LOG_WARN() << "If you wish to ignore the errors, use " - "--continue_on_error true"; + "--continue_on_error true"; if (!config_.import_online) { exit(-1); } else { @@ -60,9 +61,13 @@ void Importer::DoImportOffline() { Galaxy galaxy(Galaxy::Config{config_.db_dir, false, true, "fma.ai"}, true, gc); auto db = OpenGraph(galaxy, empty_db); db_ = &db; - std::ifstream ifs(config_.config_file); nlohmann::json conf; - ifs >> conf; + if (config_.is_graphar) { + ParserGraphArConf(conf, config_.config_file); + } else { + std::ifstream ifs(config_.config_file); + ifs >> conf; + } schemaDesc_ = import_v2::ImportConfParser::ParseSchema(conf); data_files_ = import_v2::ImportConfParser::ParseFiles(conf); @@ -72,10 +77,10 @@ void Importer::DoImportOffline() { .MeetEdgeConstraints(file.edge_src.label, file.edge_dst.label)) { throw std::runtime_error(FMA_FMT("{} not meet the edge constraints", file.path)); } - file.edge_src.id = schemaDesc_.FindVertexLabel( - file.edge_src.label).GetPrimaryField().name; - file.edge_dst.id = schemaDesc_.FindVertexLabel( - file.edge_dst.label).GetPrimaryField().name; + file.edge_src.id = + schemaDesc_.FindVertexLabel(file.edge_src.label).GetPrimaryField().name; + file.edge_dst.id = + schemaDesc_.FindVertexLabel(file.edge_dst.label).GetPrimaryField().name; } import_v2::ImportConfParser::CheckConsistent(schemaDesc_, file); } @@ -104,15 +109,14 @@ void Importer::DoImportOffline() { bool ok = db_->AddLabel(v.is_vertex, v.name, fds, *options); if (ok) { if (!config_.import_online) { - LOG_INFO() << FMA_FMT("Add {} label:{}", v.is_vertex ? "vertex" : "edge", - v.name); + LOG_INFO() << FMA_FMT("Add {} label:{}", v.is_vertex ? "vertex" : "edge", v.name); } else { - online_full_import_oss << FMA_FMT("Add {} label:{}\n", - v.is_vertex ? "vertex" : "edge", v.name); + online_full_import_oss + << FMA_FMT("Add {} label:{}\n", v.is_vertex ? "vertex" : "edge", v.name); } } else { - THROW_CODE(InputError, - "{} label:{} already exists", v.is_vertex ? "Vertex" : "Edge", v.name); + THROW_CODE(InputError, "{} label:{} already exists", v.is_vertex ? "Vertex" : "Edge", + v.name); } auto lid = db_->CreateReadTxn().GetLabelId(v.is_vertex, v.name); if (v.is_vertex) { @@ -147,64 +151,69 @@ void Importer::DoImportOffline() { // create index, ID column has creadted if (db_->AddVertexIndex(v.name, spec.name, spec.idxType)) { if (!config_.import_online) { - LOG_INFO() << FMA_FMT("Add vertex index [label:{}, field:{}, type:{}]", - v.name, spec.name, static_cast(spec.idxType)); + LOG_INFO() + << FMA_FMT("Add vertex index [label:{}, field:{}, type:{}]", v.name, + spec.name, static_cast(spec.idxType)); } else { - online_full_import_oss << FMA_FMT("Add vertex index [label:{}, " - "field:{}, type:{}]\n", v.name, spec.name, - static_cast(spec.idxType)); + online_full_import_oss << FMA_FMT( + "Add vertex index [label:{}, " + "field:{}, type:{}]\n", + v.name, spec.name, static_cast(spec.idxType)); } - } else { - THROW_CODE(InputError, - "Vertex index [label:{}, field:{}] already exists", - v.name, spec.name); + THROW_CODE(InputError, "Vertex index [label:{}, field:{}] already exists", + v.name, spec.name); } } else if (v.is_vertex && spec.index && !spec.primary && spec.idxType != lgraph::IndexType::NonuniqueIndex) { THROW_CODE(InputError, - "offline import does not support to create a unique " - "index [label:{}, field:{}]. You should create an index for " - "an attribute column after the import is complete", - v.name, spec.name); + "offline import does not support to create a unique " + "index [label:{}, field:{}]. You should create an index for " + "an attribute column after the import is complete", + v.name, spec.name); } else if (!v.is_vertex && spec.index && (spec.idxType == lgraph::IndexType::NonuniqueIndex || - spec.idxType == lgraph::IndexType::PairUniqueIndex)) { + spec.idxType == lgraph::IndexType::PairUniqueIndex)) { if (db_->AddEdgeIndex(v.name, spec.name, spec.idxType)) { if (!config_.import_online) { - LOG_INFO() << FMA_FMT("Add edge index [label:{}, field:{}, type:{}]", - v.name, spec.name, static_cast(spec.idxType)); + LOG_INFO() + << FMA_FMT("Add edge index [label:{}, field:{}, type:{}]", v.name, + spec.name, static_cast(spec.idxType)); } else { - online_full_import_oss << FMA_FMT("Add edge index [label:{}, field:{}," - " type:{}]\n", v.name, spec.name, static_cast(spec.idxType)); + online_full_import_oss << FMA_FMT( + "Add edge index [label:{}, field:{}," + " type:{}]\n", + v.name, spec.name, static_cast(spec.idxType)); } } else { - THROW_CODE(InputError, - "Edge index [label:{}, field:{}] already exists", - v.name, spec.name); + THROW_CODE(InputError, "Edge index [label:{}, field:{}] already exists", + v.name, spec.name); } } else if (!v.is_vertex && spec.index && spec.idxType == lgraph::IndexType::GlobalUniqueIndex) { THROW_CODE(InputError, - "offline import does not support to create an unique " - "index [label:{}, field:{}]. You should create an index for " - "an attribute column after the import is complete", - v.name, spec.name); + "offline import does not support to create an unique " + "index [label:{}, field:{}]. You should create an index for " + "an attribute column after the import is complete", + v.name, spec.name); } if (spec.fulltext) { bool ok = db_->AddFullTextIndex(v.is_vertex, v.name, spec.name); if (ok) { if (!config_.import_online) { - LOG_INFO() << FMA_FMT("Add fulltext index [{} label:{}, field:{}]", - v.is_vertex ? "vertex" : "edge", v.name, spec.name); + LOG_INFO() + << FMA_FMT("Add fulltext index [{} label:{}, field:{}]", + v.is_vertex ? "vertex" : "edge", v.name, spec.name); } else { - online_full_import_oss << FMA_FMT("Add fulltext index [{} label:{}, " - "field:{}]\n", v.is_vertex ? "vertex" : "edge", v.name, spec.name); + online_full_import_oss << FMA_FMT( + "Add fulltext index [{} label:{}, " + "field:{}]\n", + v.is_vertex ? "vertex" : "edge", v.name, spec.name); } } else { THROW_CODE(InputError, - "Fulltext index [{} label:{}, field:{}] already exists", - v.is_vertex ? "vertex" : "edge", v.name, spec.name); + "Fulltext index [{} label:{}, field:{}] already exists", + v.is_vertex ? "vertex" : "edge", v.name, spec.name); } } } @@ -234,16 +243,14 @@ void Importer::VertexDataToSST() { rocksdb::DB* db; auto s = rocksdb::DB::Open(options, vid_path_, &db); if (!s.ok()) { - throw std::runtime_error( - FMA_FMT("Opening DB failed, error: {}", s.ToString().c_str())); + throw std::runtime_error(FMA_FMT("Opening DB failed, error: {}", s.ToString().c_str())); } rocksdb_vids_.reset(db); } - parse_file_threads_ = std::make_unique( - config_.parse_file_threads); - generate_sst_threads_ = std::make_unique( - config_.generate_sst_threads); + parse_file_threads_ = std::make_unique(config_.parse_file_threads); + generate_sst_threads_ = + std::make_unique(config_.generate_sst_threads); struct VertexDataBlock { VertexId start_vid = 0; uint16_t key_col_id = 0; @@ -263,7 +270,7 @@ void Importer::VertexDataToSST() { if (!file.is_vertex_file) { continue; } - boost::asio::post(*parse_file_threads_, [this, &blob_writer, &pending_tasks, &file](){ + boost::asio::post(*parse_file_threads_, [this, &blob_writer, &pending_tasks, &file]() { try { std::vector fts; { @@ -288,7 +295,9 @@ void Importer::VertexDataToSST() { uint16_t key_col_id = file.FindIdxExcludeSkip( schemaDesc_.FindVertexLabel(file.label).GetPrimaryField().name); std::unique_ptr parser; - if (file.data_format == "CSV") { + if (file.data_format == "GraphAr") { + parser.reset(new import_v2::GraphArParser(file)); + } else if (file.data_format == "CSV") { parser.reset(new import_v2::ColumnParser( file.path, fts, config_.parse_block_size, config_.parse_block_threads, file.n_header_line, config_.continue_on_error, config_.delimiter, @@ -329,10 +338,10 @@ void Importer::VertexDataToSST() { std::vector vec_kvs; VertexId start_vid = dataBlock->start_vid; std::unique_ptr vertex_sst_writer( - new rocksdb::SstFileWriter(rocksdb::EnvOptions(), - {}, nullptr, false)); - std::string sst_path = sst_files_path_ + "/vertex_" + - std::to_string(start_vid); + new rocksdb::SstFileWriter(rocksdb::EnvOptions(), {}, nullptr, + false)); + std::string sst_path = + sst_files_path_ + "/vertex_" + std::to_string(start_vid); auto s = vertex_sst_writer->Open(sst_path); if (!s.ok()) { throw std::runtime_error( @@ -395,7 +404,7 @@ void Importer::VertexDataToSST() { }); } s = vertex_sst_writer->Put({(const char*)&vid, sizeof(vid)}, - {value.Data(), value.Size()}); + {value.Data(), value.Size()}); if (!s.ok()) { throw std::runtime_error( FMA_FMT("vertex_sst_writer.Put error, {}", s.ToString())); @@ -416,10 +425,9 @@ void Importer::VertexDataToSST() { if (!config_.keep_vid_in_memory && !vec_kvs.empty()) { rocksdb::SstFileWriter vid_sst_writer(rocksdb::EnvOptions(), {}, - nullptr, false); - s = vid_sst_writer.Open( - sst_files_path_ + "/vid_" + - std::to_string(dataBlock->start_vid)); + nullptr, false); + s = vid_sst_writer.Open(sst_files_path_ + "/vid_" + + std::to_string(dataBlock->start_vid)); if (!s.ok()) { throw std::runtime_error( FMA_FMT("Failed to open vid_sst_writer")); @@ -470,7 +478,7 @@ void Importer::VertexDataToSST() { if (!config_.keep_vid_in_memory) { auto begin = fma_common::GetTime(); std::vector ingest_files; - for (const auto & entry : std::filesystem::directory_iterator(sst_files_path_)) { + for (const auto& entry : std::filesystem::directory_iterator(sst_files_path_)) { const auto& path = entry.path().string(); if (path.find("vid_") != std::string::npos) { ingest_files.push_back(path); @@ -502,8 +510,9 @@ void Importer::VertexDataToSST() { if (!config_.import_online) { LOG_INFO() << "vids CompactRange, time: " << fma_common::GetTime() - begin << "s"; } else { - online_full_import_oss << "vids CompactRange, time: " + - std::to_string(fma_common::GetTime() - begin) + "s\n"; + online_full_import_oss + << "vids CompactRange, time: " + std::to_string(fma_common::GetTime() - begin) + + "s\n"; } } @@ -518,10 +527,9 @@ void Importer::VertexDataToSST() { void Importer::EdgeDataToSST() { auto t1 = fma_common::GetTime(); - parse_file_threads_ = std::make_unique( - config_.parse_file_threads); - generate_sst_threads_ = std::make_unique( - config_.generate_sst_threads); + parse_file_threads_ = std::make_unique(config_.parse_file_threads); + generate_sst_threads_ = + std::make_unique(config_.generate_sst_threads); struct EdgeDataBlock { std::vector> block; uint64_t start_eid = 0; @@ -539,7 +547,7 @@ void Importer::EdgeDataToSST() { struct KV { std::string key; Value value; - KV(std::string k, Value v): key(std::move(k)), value(std::move(v)) {} + KV(std::string k, Value v) : key(std::move(k)), value(std::move(v)) {} }; BufferedBlobWriter blob_writer(db_->GetLightningGraph()); std::atomic pending_tasks(0); @@ -550,9 +558,8 @@ void Importer::EdgeDataToSST() { if (file.is_vertex_file) { continue; } - boost::asio::post(*parse_file_threads_, - [this, &file, &blob_writer, &pending_tasks, - &sst_file_id, &unique_index_keys](){ + boost::asio::post(*parse_file_threads_, [this, &file, &blob_writer, &pending_tasks, + &sst_file_id, &unique_index_keys]() { try { std::vector fts; { @@ -594,7 +601,9 @@ void Importer::EdgeDataToSST() { } } std::unique_ptr parser; - if (file.data_format == "CSV") { + if (file.data_format == "GraphAr") { + parser.reset(new import_v2::GraphArParser(file)); + } else if (file.data_format == "CSV") { parser.reset(new import_v2::ColumnParser( file.path, fts, config_.parse_block_size, config_.parse_block_threads, file.n_header_line, config_.continue_on_error, config_.delimiter, @@ -650,10 +659,9 @@ void Importer::EdgeDataToSST() { pending_tasks--; uint64_t num = ++sst_file_id; std::unique_ptr sst_file_writer( - new rocksdb::SstFileWriter(rocksdb::EnvOptions(), - {}, nullptr, false)); - std::string sst_path = sst_files_path_ + "/edge_" + - std::to_string(num); + new rocksdb::SstFileWriter(rocksdb::EnvOptions(), {}, nullptr, + false)); + std::string sst_path = sst_files_path_ + "/edge_" + std::to_string(num); auto s = sst_file_writer->Open(sst_path); if (!s.ok()) { throw std::runtime_error(FMA_FMT("failed to open sst_file_writer")); @@ -709,8 +717,8 @@ void Importer::EdgeDataToSST() { continue; } auto slice_k = vid_iter->key(); - auto offset = slice_k.data() + - (slice_k.size() - sizeof(VertexId)); + auto offset = + slice_k.data() + (slice_k.size() - sizeof(VertexId)); src_vid = *(VertexId*)offset; slice_k.remove_suffix(sizeof(VertexId)); if (slice_k.compare(k) != 0) { @@ -746,8 +754,8 @@ void Importer::EdgeDataToSST() { continue; } auto slice_k = vid_iter->key(); - auto offset = slice_k.data() + - (slice_k.size() - sizeof(VertexId)); + auto offset = + slice_k.data() + (slice_k.size() - sizeof(VertexId)); dst_vid = *(VertexId*)offset; slice_k.remove_suffix(sizeof(VertexId)); if (slice_k.compare(k) != 0) { @@ -770,8 +778,8 @@ void Importer::EdgeDataToSST() { if (unique_index_col.IsString() && unique_index_col.string().size() > lgraph::_detail::MAX_KEY_SIZE) { - OnErrorOffline("Unique index string key is too long: " - + unique_index_col.string().substr(0, 1024)); + OnErrorOffline("Unique index string key is too long: " + + unique_index_col.string().substr(0, 1024)); continue; } std::string unique_key; @@ -849,7 +857,7 @@ void Importer::EdgeDataToSST() { [](const KV& a, const KV& b) { return a.key < b.key; }); for (auto& pair : vec_kvs) { sst_file_writer->Put(pair.key, - {pair.value.Data(), pair.value.Size()}); + {pair.value.Data(), pair.value.Size()}); } std::vector().swap(vec_kvs); sst_file_writer->Finish(); @@ -906,7 +914,7 @@ void Importer::VertexPrimaryIndexToLmdb() { LabelId preLabelId = std::numeric_limits::max(); auto txn = db_->CreateWriteTxn(); VertexIndex* vertexIndex = nullptr; - auto write_index = [&](const std::string& key, VertexId vid){ + auto write_index = [&](const std::string& key, VertexId vid) { FMA_DBG_CHECK(key.size() > sizeof(LabelId)); LabelId labelId = *((LabelId*)key.data()); if (labelId != preLabelId) { @@ -1038,8 +1046,7 @@ void Importer::VertexPrimaryIndexToLmdb() { } rocksdb_vids_.reset(nullptr); } -typedef std::vector>::iterator IT; +typedef std::vector>::iterator IT; void Importer::RocksdbToLmdb() { auto t1 = fma_common::GetTime(); std::unique_ptr rocksdb; @@ -1060,7 +1067,7 @@ void Importer::RocksdbToLmdb() { rocksdb.reset(db); } std::vector ingest_files; - for (const auto & entry : std::filesystem::directory_iterator(sst_files_path_)) { + for (const auto& entry : std::filesystem::directory_iterator(sst_files_path_)) { ingest_files.push_back(entry.path().generic_string()); } if (ingest_files.empty()) { @@ -1069,7 +1076,8 @@ void Importer::RocksdbToLmdb() { if (!config_.import_online) { exit(-1); } else { - throw std::runtime_error("no sst files are created, " + throw std::runtime_error( + "no sst files are created, " "please check if the input vertex and edge files are valid"); } } @@ -1092,8 +1100,8 @@ void Importer::RocksdbToLmdb() { if (!config_.import_online) { LOG_INFO() << "CompactRange, time: " << fma_common::GetTime() - begin << "s"; } else { - online_full_import_oss << "CompactRange, time: " + - std::to_string(fma_common::GetTime() - begin) + "s\n"; + online_full_import_oss + << "CompactRange, time: " + std::to_string(fma_common::GetTime() - begin) + "s\n"; } } if (!config_.keep_vid_in_memory) { @@ -1125,16 +1133,15 @@ void Importer::RocksdbToLmdb() { std::atomic pending_tasks(0); std::atomic stage(0); - std::unique_ptr lmdb_writer( - new boost::asio::thread_pool(1)); + std::unique_ptr lmdb_writer(new boost::asio::thread_pool(1)); std::unique_ptr rocksdb_readers( new boost::asio::thread_pool(config_.read_rocksdb_threads)); for (uint16_t i = 0; i < config_.read_rocksdb_threads; i++) { - boost::asio::post(*rocksdb_readers, [this, i, &pending_tasks, - &rocksdb, &lmdb_writer, &stage]() { + boost::asio::post(*rocksdb_readers, [this, i, &pending_tasks, &rocksdb, &lmdb_writer, + &stage]() { uint64_t start_vid = i * config_.vid_num_per_reading; uint64_t bigend_start_vid = boost::endian::native_to_big(start_vid); - uint64_t end_vid = (i+1) * config_.vid_num_per_reading; + uint64_t end_vid = (i + 1) * config_.vid_num_per_reading; uint64_t bigend_end_vid = boost::endian::native_to_big(end_vid); rocksdb::ReadOptions options; options.ignore_range_deletions = true; @@ -1157,52 +1164,51 @@ void Importer::RocksdbToLmdb() { VertexId pre_vid = InvalidVid; EdgeUid last_uid(-1, -1, 0, -1, -1); - auto throw_kvs_to_lmdb = [&lmdb_writer, &pending_tasks, this, &stage, i] - (std::vector> kvs, - std::vector> v_property, - std::vector> e_property){ - while (stage != i || pending_tasks > 1) { - fma_common::SleepUs(1000); - } - if (kvs.empty()) { - return; - } - pending_tasks++; - boost::asio::post(*lmdb_writer, [this, &pending_tasks, - kvs = std::move(kvs), - v_property = std::move(v_property), - e_property = std::move(e_property)]() { - Transaction txn = db_->CreateWriteTxn(); - for (auto& kv : kvs) { - txn.ImportAppendDataRaw(kv.first, kv.second); - } - txn.RefreshNextVid(); - for (auto& pro : v_property) { - auto s = (Schema*)(txn.GetSchema(std::get<0>(pro), true)); - s->AddDetachedVertexProperty( - txn.GetTxn(), std::get<1>(pro), std::get<2>(pro)); + auto throw_kvs_to_lmdb = + [&lmdb_writer, &pending_tasks, this, &stage, i]( + std::vector> kvs, + std::vector> v_property, + std::vector> e_property) { + while (stage != i || pending_tasks > 1) { + fma_common::SleepUs(1000); } - for (auto& pro : e_property) { - auto s = (Schema*)(txn.GetSchema(std::get<0>(pro), false)); - s->AddDetachedEdgeProperty( - txn.GetTxn(), std::get<1>(pro), std::get<2>(pro)); + if (kvs.empty()) { + return; } - txn.Commit(); - pending_tasks--; - }); - }; + pending_tasks++; + boost::asio::post(*lmdb_writer, [this, &pending_tasks, kvs = std::move(kvs), + v_property = std::move(v_property), + e_property = std::move(e_property)]() { + Transaction txn = db_->CreateWriteTxn(); + for (auto& kv : kvs) { + txn.ImportAppendDataRaw(kv.first, kv.second); + } + txn.RefreshNextVid(); + for (auto& pro : v_property) { + auto s = (Schema*)(txn.GetSchema(std::get<0>(pro), true)); + s->AddDetachedVertexProperty(txn.GetTxn(), std::get<1>(pro), + std::get<2>(pro)); + } + for (auto& pro : e_property) { + auto s = (Schema*)(txn.GetSchema(std::get<0>(pro), false)); + s->AddDetachedEdgeProperty(txn.GetTxn(), std::get<1>(pro), + std::get<2>(pro)); + } + txn.Commit(); + pending_tasks--; + }); + }; auto make_kvs = [&]() { if (!split) { lgraph::graph::VertexValue vov(GetConstRef(vdata)); IT next_beg; lgraph::graph::EdgeValue oev(outs.begin(), outs.end(), out_last_lid, - out_last_tid, out_last_dst, out_last_eid, - next_beg, true); + out_last_tid, out_last_dst, out_last_eid, next_beg, + true); FMA_DBG_ASSERT(next_beg == outs.end()); outs.clear(); - lgraph::graph::EdgeValue iev(ins.begin(), ins.end(), in_last_lid, - in_last_tid, in_last_dst, in_last_eid, - next_beg, true); + lgraph::graph::EdgeValue iev(ins.begin(), ins.end(), in_last_lid, in_last_tid, + in_last_dst, in_last_eid, next_beg, true); FMA_DBG_ASSERT(next_beg == ins.end()); ins.clear(); Value data; @@ -1217,8 +1223,7 @@ void Importer::RocksdbToLmdb() { out_last_tid, out_last_dst, out_last_eid, next_start, true); FMA_DBG_ASSERT(next_start == outs.end()); - kvs.emplace_back(oev.CreateOutEdgeKey(pre_vid), - std::move(oev.GetBuf())); + kvs.emplace_back(oev.CreateOutEdgeKey(pre_vid), std::move(oev.GetBuf())); all_kv_size += kvs.back().first.Size() + kvs.back().second.Size(); outs.clear(); } @@ -1228,8 +1233,7 @@ void Importer::RocksdbToLmdb() { in_last_tid, in_last_dst, in_last_eid, next_start, true); FMA_DBG_ASSERT(next_start == ins.end()); - kvs.emplace_back(iev.CreateInEdgeKey(pre_vid), - std::move(iev.GetBuf())); + kvs.emplace_back(iev.CreateInEdgeKey(pre_vid), std::move(iev.GetBuf())); all_kv_size += kvs.back().first.Size() + kvs.back().second.Size(); ins.clear(); } @@ -1250,8 +1254,7 @@ void Importer::RocksdbToLmdb() { while (true) { pre_vid = InvalidVid; for (iter->Seek({(const char*)&bigend_start_vid, sizeof(bigend_start_vid)}); - iter->Valid(); - iter->Next()) { + iter->Valid(); iter->Next()) { auto key = iter->key(); if (key.compare({(const char*)&bigend_end_vid, sizeof(bigend_end_vid)}) >= 0) { if (pre_vid != InvalidVid) { @@ -1275,8 +1278,7 @@ void Importer::RocksdbToLmdb() { if (pre_vid != InvalidVid) { make_kvs(); if (all_kv_size > config_.max_size_per_reading) { - throw_kvs_to_lmdb(std::move(kvs), - std::move(vertex_property), + throw_kvs_to_lmdb(std::move(kvs), std::move(vertex_property), std::move(edge_property)); all_kv_size = 0; } @@ -1285,8 +1287,8 @@ void Importer::RocksdbToLmdb() { } if (vlid_detach_.at(lid)) { // detach property - vertex_property.emplace_back( - lid, vid, Value::MakeCopy(val.data(), val.size())); + vertex_property.emplace_back(lid, vid, + Value::MakeCopy(val.data(), val.size())); vdata = import_v2::DenseString((const char*)(&lid), sizeof(LabelId)); } else { vdata = import_v2::DenseString(val.data(), val.size()); @@ -1320,17 +1322,15 @@ void Importer::RocksdbToLmdb() { uid.lid = labelId; uid.dst = vertexId; uid.tid = tid; - if (last_uid.src == uid.src && - last_uid.lid == uid.lid && - last_uid.dst == uid.dst && - last_uid.tid == uid.tid) { + if (last_uid.src == uid.src && last_uid.lid == uid.lid && + last_uid.dst == uid.dst && last_uid.tid == uid.tid) { uid.eid = last_uid.eid + 1; } else { uid.eid = 0; } last_uid = uid; - edge_property.emplace_back( - labelId, uid, Value::MakeCopy(val.data(), val.size())); + edge_property.emplace_back(labelId, uid, + Value::MakeCopy(val.data(), val.size())); outs.emplace_back(labelId, tid, vertexId, import_v2::DenseString()); } else { outs.emplace_back(labelId, tid, vertexId, @@ -1352,10 +1352,9 @@ void Importer::RocksdbToLmdb() { } if (!outs.empty()) { IT next_start; - lgraph::graph::EdgeValue oev(outs.begin(), outs.end(), - out_last_lid, out_last_tid, - out_last_dst, out_last_eid, - next_start, true); + lgraph::graph::EdgeValue oev(outs.begin(), outs.end(), out_last_lid, + out_last_tid, out_last_dst, + out_last_eid, next_start, true); FMA_DBG_ASSERT(next_start == outs.end()); kvs.emplace_back(oev.CreateOutEdgeKey(pre_vid), std::move(oev.GetBuf())); @@ -1375,8 +1374,7 @@ void Importer::RocksdbToLmdb() { } if (all_kv_size > config_.max_size_per_reading) { - throw_kvs_to_lmdb(std::move(kvs), - std::move(vertex_property), + throw_kvs_to_lmdb(std::move(kvs), std::move(vertex_property), std::move(edge_property)); all_kv_size = 0; } @@ -1392,8 +1390,7 @@ void Importer::RocksdbToLmdb() { } else { if (pre_vid != InvalidVid) { make_kvs(); - throw_kvs_to_lmdb(std::move(kvs), - std::move(vertex_property), + throw_kvs_to_lmdb(std::move(kvs), std::move(vertex_property), std::move(edge_property)); all_kv_size = 0; } @@ -1409,8 +1406,8 @@ void Importer::RocksdbToLmdb() { if (!config_.import_online) { LOG_INFO() << "Dump rocksdb into lmdb, time: " << t2 - t1 << "s"; } else { - online_full_import_oss << "Dump rocksdb into lmdb, time: " + - std::to_string(t2 - t1) + "s\n"; + online_full_import_oss << "Dump rocksdb into lmdb, time: " + std::to_string(t2 - t1) + + "s\n"; } } @@ -1460,9 +1457,7 @@ AccessControlledDB Importer::OpenGraph(Galaxy& galaxy, bool empty_db) { return galaxy.OpenGraph(config_.user, config_.graph); } -std::string Importer::OnlineFullImportLog() { - return online_full_import_oss.str(); -} +std::string Importer::OnlineFullImportLog() { return online_full_import_oss.str(); } } // namespace import_v3 } // namespace lgraph diff --git a/src/import/import_v3.h b/src/import/import_v3.h index 283859edd2..96898f3f9c 100644 --- a/src/import/import_v3.h +++ b/src/import/import_v3.h @@ -21,6 +21,8 @@ #include "core/lightning_graph.h" #include "core/type_convert.h" #include "import/column_parser.h" +#include "import/jsonlines_parser.h" +#include "import/graphar_parser.h" #include "import/import_config_parser.h" #include "import/dense_string.h" @@ -45,6 +47,7 @@ class Importer { std::string db_dir = "./lgraph_db"; // db data dir to use std::string user = "admin"; std::string password = "73@TuGraph"; + bool is_graphar = false; // import Graphar file, config file must be absolute path std::string graph = "default"; // graph name bool delete_if_exists = false; // force import, delete data if already exists bool continue_on_error = false; // whether to continue when there are data errors diff --git a/src/import/jsonlines_parser.h b/src/import/jsonlines_parser.h new file mode 100644 index 0000000000..f71f25d15b --- /dev/null +++ b/src/import/jsonlines_parser.h @@ -0,0 +1,258 @@ +/** + * Copyright 2022 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#pragma once + +#include +#include + +#include "core/data_type.h" +#include "core/field_data_helper.h" +#include "import/block_parser.h" +#include "import/import_config_parser.h" +#include "restful/server/json_convert.h" +#include "tools/json.hpp" + +namespace lgraph { +namespace import_v2 { + +// Parse a jsonline into a vector of FieldData +class JsonLinesParser : public BlockParser { + public: + typedef std::function(const char*, const char*, + std::vector&)> + ParseFunc; + JsonLinesParser(std::unique_ptr stream, + const std::vector& field_specs, size_t block_size, size_t n_threads, + size_t n_header_lines, bool forgiving, int64_t max_err_msgs = 100) + : stream_(std::move(stream)), + field_specs_(field_specs), + forgiving_(forgiving), + max_errors_(max_err_msgs) { + init(block_size, n_threads, n_header_lines); + } + JsonLinesParser(const std::string& path, const std::vector& field_specs, + size_t block_size, size_t n_threads, size_t n_header_lines, bool forgiving, + int64_t max_err_msgs = 100) + : stream_(new fma_common::InputFmaStream(path)), + field_specs_(field_specs), + forgiving_(forgiving), + max_errors_(max_err_msgs) { + if (!stream_->Good()) { + LOG_INFO() << "Failed to open input file " << path; + throw std::runtime_error("failed to open input file [" + path + "]"); + } + init(block_size, n_threads, n_header_lines); + } + + ~JsonLinesParser() { parser_->Stop(); } + + bool ReadBlock(std::vector>& buf) { return parser_->ReadBlock(buf); } + + private: + void init(size_t block_size, size_t n_threads, size_t n_header_lines) { + parser_.reset(new fma_common::TextParser, ParseFunc>( + *stream_, + [this](const char* start, const char* end, std::vector& fds) { + return parse_jsonline(start, end, fds); + }, + block_size, n_threads, n_header_lines)); + } + + std::tuple parse_jsonline(const char* start, const char* end, + std::vector& fds) { + using namespace web; + using namespace boost; + size_t trim_count = 0; + const char* original_starting = start; + while (start < end && fma_common::TextParserUtils::IsTrimable(*start)) { + start++; + trim_count++; + } + if (start == end) { + return std::tuple(trim_count, false); + } + +#define SKIP_OR_THROW(except) \ + if (forgiving_) { \ + if (errors_++ < max_errors_) LOG_INFO() << except.what(); \ + while (start < end && !fma_common::TextParserUtils::IsNewLine(*start)) start++; \ + while (start < end && fma_common::TextParserUtils::IsNewLine(*start)) start++; \ + return std::tuple(start - original_starting, false); \ + } else { \ + std::throw_with_nested(except); \ + } + + // use stream parse to avoid memory copy + iostreams::stream istr(start, end - start); + std::error_code err_code; + json::value json_obj = json::value::parse(istr, err_code); + switch (err_code.value()) { + case 0: + break; + case 1: + { + istr.unget(); // hack + break; + } + default: + { + SKIP_OR_THROW(ParseJsonException(start, end, err_code.message())); + } + } + using namespace lgraph::field_data_helper; + try { + for (size_t column = 0; column < field_specs_.size(); column++) { + FieldSpec& field_spec = field_specs_[column]; + if (field_spec.name.empty()) { + continue; + } + if (json_obj.at(column).is_null() && field_spec.optional) { + fds.emplace_back(); + continue; + } + FieldData fd; + switch (field_spec.type) { + case FieldType::NUL: + FMA_ASSERT(false); + case FieldType::BOOL: + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Bool(val.as_bool()); + } + break; + } + case FieldType::INT8: + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Int8(val.as_number().to_int32()); + } + break; + } + case FieldType::INT16: + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Int16(val.as_number().to_int32()); + } + break; + } + case FieldType::INT32: + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Int32(val.as_number().to_int32()); + } + break; + } + case FieldType::INT64: + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Int64(val.as_number().to_int64()); + } + break; + } + case FieldType::FLOAT: + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Float(static_cast(val.as_double())); + } + break; + } + case FieldType::DOUBLE: + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData( + str.data(), str.data() + str.size(), fd); + } else { + fd = FieldData::Double(val.as_double()); + } + break; + } + case FieldType::DATE: + fd = FieldData::Date(ToStdString(json_obj.at(column).as_string())); + break; + case FieldType::DATETIME: + fd = FieldData::DateTime(ToStdString(json_obj.at(column).as_string())); + break; + case FieldType::STRING: + fd = FieldData::String(ToStdString(json_obj.at(column).as_string())); + break; + case FieldType::BLOB: + fd = FieldData::Blob(ToStdString(json_obj.at(column).as_string())); + break; + case FieldType::POINT: + // TODO(shw): Support import for point type; + case FieldType::LINESTRING: + // TODO(shw): support import for linestring type; + case FieldType::POLYGON: + // TODO(shw): support import for polygon type; + case FieldType::SPATIAL: + // TODO(shw): support import for spatial type; + throw std::runtime_error("do not support spatial type now!"); + } + if (fd.is_null()) { + throw std::bad_cast(); + } + fds.emplace_back(std::move(fd)); + } + } catch (std::exception& e) { + SKIP_OR_THROW(JsonReadException(start, end, e.what())); + } catch (...) { + SKIP_OR_THROW(JsonReadException(start, end, "Unknown exception")); + } + return std::tuple(static_cast(istr.tellg()) + trim_count, true); + } + + std::unique_ptr stream_; + std::vector field_specs_; + std::unique_ptr, ParseFunc>> parser_; + bool forgiving_ = false; + int64_t errors_ = 0; + int64_t max_errors_ = 100; +#undef SKIP_OR_THROW +}; + +} // namespace import_v2 +} // namespace lgraph diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 64e380bd2f..0d80acb8d8 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -58,6 +58,7 @@ add_executable(unit_test test_import_planner.cpp test_import_v2.cpp test_import_v3.cpp + test_import_gar.cpp test_type_convert.cpp test_json_convert.cpp test_lgraph.cpp @@ -143,7 +144,8 @@ target_link_libraries(unit_test lgraph_server_lib geax_isogql bolt - librocksdb.a) + librocksdb.a + gar) target_link_libraries(unit_test ${GTEST_MAIN_LIB} diff --git a/test/integration/test_import_gar.py b/test/integration/test_import_gar.py new file mode 100644 index 0000000000..871a0c374a --- /dev/null +++ b/test/integration/test_import_gar.py @@ -0,0 +1,50 @@ +import pytest +import logging +from pathlib import Path + +log = logging.getLogger(__name__) + +class TestImportGar: + config_path = Path.cwd().parent.parent / "test/resource/data/gar_test/ldbc_parquet/ldbc_sample.graph.yml" + + IMPORTOPT = {"cmd":f"./lgraph_import -c {config_path} --gar true --overwrite true --d gar_db", + "cleanup_dir":["./gar_db"]} + + SERVEROPT = {"cmd":"./lgraph_server -c lgraph_standalone.json --directory ./gar_db --port 27070 --rpc_port 27071 --log_dir '' ", + "cleanup_dir":["./gar_db"]} + + CLIENTOPT = {"host":"http://127.0.0.1:27071/LGraphHttpService/Query/", "user":"admin", "password":"73@TuGraph"} + + @pytest.mark.parametrize("importor", [IMPORTOPT], indirect=True) + @pytest.mark.parametrize("server", [SERVEROPT], indirect=True) + @pytest.mark.parametrize("rest_client", [CLIENTOPT], indirect=True) + def test_import_gar(self, importor, server, rest_client): + # test vertex label + vertex_label_res = rest_client.call_cypher("default", "CALL db.vertexLabels()") + assert len(vertex_label_res) == 1 + assert vertex_label_res[0]['label'] == 'person' + + # test edge label + edge_label_res = rest_client.call_cypher("default", "CALL db.edgeLabels()") + assert len(edge_label_res) == 1 + assert edge_label_res[0]['label'] == 'knows' + + # test vertex count + vertex_count = rest_client.call_cypher("default", "MATCH (p:person) RETURN count(p)") + assert vertex_count[0]['count(p)'] == 903 + + # text edge count + edge_count = rest_client.call_cypher("default", "MATCH ()-[r:knows]-() RETURN count(r)") + assert edge_count[0]['count(r)'] == 6626 * 2 + + # text vertex keys + vertex_keys = rest_client.call_cypher("default", "MATCH (p:person) RETURN keys(p) LIMIT 1") + assert "id" in vertex_keys[0]['keys(p)'] + assert "firstName" in vertex_keys[0]['keys(p)'] + assert "lastName" in vertex_keys[0]['keys(p)'] + assert "gender" in vertex_keys[0]['keys(p)'] + assert len(vertex_keys[0]['keys(p)']) == 30 + + # test edge has 'creationDate' key + edge_has_key = rest_client.call_cypher("default", "MATCH ()-[r]-() RETURN exists(r.creationDate) LIMIT 1") + assert edge_has_key[0]['{EXISTS(r.creationDate)}'] == True diff --git a/test/resource/data/gar_test/edge_test/actor.vertex.yml b/test/resource/data/gar_test/edge_test/actor.vertex.yml new file mode 100644 index 0000000000..f30562352a --- /dev/null +++ b/test/resource/data/gar_test/edge_test/actor.vertex.yml @@ -0,0 +1,10 @@ +label: actor +chunk_size: 4096 +prefix: vertex/actor/ +property_groups: + - file_type: parquet + properties: + - name: id + data_type: int64 + is_primary: true +version: gar/v1 diff --git a/test/resource/data/gar_test/edge_test/actor_comment_movie.edge.yml b/test/resource/data/gar_test/edge_test/actor_comment_movie.edge.yml new file mode 100644 index 0000000000..cf23cd7c34 --- /dev/null +++ b/test/resource/data/gar_test/edge_test/actor_comment_movie.edge.yml @@ -0,0 +1,19 @@ +src_label: actor +edge_label: comment +dst_label: movie +chunk_size: 1024 +src_chunk_size: 100 +dst_chunk_size: 100 +directed: false +prefix: edge/actor_comment_movie/ +adj_lists: + - ordered: true + aligned_by: src + file_type: parquet +property_groups: + - file_type: parquet + properties: + - name: diffculty + data_type: string + is_primary: false +version: gar/v1 diff --git a/test/resource/data/gar_test/edge_test/movie.graph.yml b/test/resource/data/gar_test/edge_test/movie.graph.yml new file mode 100644 index 0000000000..e63ad58342 --- /dev/null +++ b/test/resource/data/gar_test/edge_test/movie.graph.yml @@ -0,0 +1,9 @@ +name: movie +vertices: + - viewer.vertex.yml + - actor.vertex.yml + - movie.vertex.yml +edges: + - viewer_comment_movie.edge.yml + - actor_comment_movie.edge.yml +version: gar/v1 diff --git a/test/resource/data/gar_test/edge_test/movie.vertex.yml b/test/resource/data/gar_test/edge_test/movie.vertex.yml new file mode 100644 index 0000000000..0512ea8ad0 --- /dev/null +++ b/test/resource/data/gar_test/edge_test/movie.vertex.yml @@ -0,0 +1,10 @@ +label: movie +chunk_size: 4096 +prefix: vertex/movie/ +property_groups: + - file_type: parquet + properties: + - name: id + data_type: string + is_primary: true +version: gar/v1 \ No newline at end of file diff --git a/test/resource/data/gar_test/edge_test/viewer.vertex.yml b/test/resource/data/gar_test/edge_test/viewer.vertex.yml new file mode 100644 index 0000000000..3ebc44d900 --- /dev/null +++ b/test/resource/data/gar_test/edge_test/viewer.vertex.yml @@ -0,0 +1,10 @@ +label: viewer +chunk_size: 4096 +prefix: vertex/viewer/ +property_groups: + - file_type: parquet + properties: + - name: id + data_type: int64 + is_primary: true +version: gar/v1 diff --git a/test/resource/data/gar_test/edge_test/viewer_comment_movie.edge.yml b/test/resource/data/gar_test/edge_test/viewer_comment_movie.edge.yml new file mode 100644 index 0000000000..a16ed6ecde --- /dev/null +++ b/test/resource/data/gar_test/edge_test/viewer_comment_movie.edge.yml @@ -0,0 +1,19 @@ +src_label: viewer +edge_label: comment +dst_label: movie +chunk_size: 1024 +src_chunk_size: 100 +dst_chunk_size: 100 +directed: false +prefix: edge/viewer_comment_movie/ +adj_lists: + - ordered: true + aligned_by: src + file_type: parquet +property_groups: + - file_type: parquet + properties: + - name: rate + data_type: string + is_primary: false +version: gar/v1 diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part0/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part0/chunk0 new file mode 100644 index 0000000000..2d1153951b Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part0/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part1/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part1/chunk0 new file mode 100644 index 0000000000..5f140146ea Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part1/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part2/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part2/chunk0 new file mode 100644 index 0000000000..d0309a82bf Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part2/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part2/chunk1 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part2/chunk1 new file mode 100644 index 0000000000..109b260389 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part2/chunk1 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part3/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part3/chunk0 new file mode 100644 index 0000000000..946ee95635 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part3/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part4/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part4/chunk0 new file mode 100644 index 0000000000..eb1dcf063d Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part4/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part5/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part5/chunk0 new file mode 100644 index 0000000000..dbc7776644 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part5/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part6/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part6/chunk0 new file mode 100644 index 0000000000..5aada1c2b5 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part6/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part7/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part7/chunk0 new file mode 100644 index 0000000000..2dc13a3b8d Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part7/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part8/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part8/chunk0 new file mode 100644 index 0000000000..f91307faee Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part8/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part9/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part9/chunk0 new file mode 100644 index 0000000000..ffa93f4d9c Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/adj_list/part9/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part0/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part0/chunk0 new file mode 100644 index 0000000000..3ec532a671 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part0/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part1/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part1/chunk0 new file mode 100644 index 0000000000..05e5e10a85 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part1/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part2/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part2/chunk0 new file mode 100644 index 0000000000..1b221d21e1 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part2/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part2/chunk1 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part2/chunk1 new file mode 100644 index 0000000000..89141ff116 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part2/chunk1 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part3/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part3/chunk0 new file mode 100644 index 0000000000..1e21b862e7 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part3/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part4/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part4/chunk0 new file mode 100644 index 0000000000..8d8aafa019 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part4/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part5/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part5/chunk0 new file mode 100644 index 0000000000..4cdff17710 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part5/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part6/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part6/chunk0 new file mode 100644 index 0000000000..0e8c74b9e6 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part6/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part7/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part7/chunk0 new file mode 100644 index 0000000000..bb5745cf08 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part7/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part8/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part8/chunk0 new file mode 100644 index 0000000000..4ff4c49676 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part8/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part9/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part9/chunk0 new file mode 100644 index 0000000000..3f6ecb567e Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/creationDate/part9/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count0 new file mode 100644 index 0000000000..fd12e09195 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count1 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count1 new file mode 100644 index 0000000000..a384772a7e Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count1 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count2 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count2 new file mode 100644 index 0000000000..255c67ef16 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count2 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count3 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count3 new file mode 100644 index 0000000000..a6f3cd580a Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count3 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count4 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count4 new file mode 100644 index 0000000000..0aa991b93c Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count4 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count5 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count5 new file mode 100644 index 0000000000..3bf0211ce1 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count5 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count6 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count6 new file mode 100644 index 0000000000..ba21895fcd Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count6 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count7 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count7 new file mode 100644 index 0000000000..118a695f4e Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count7 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count8 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count8 new file mode 100644 index 0000000000..fbb8a16d77 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count8 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count9 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count9 new file mode 100644 index 0000000000..f1dba9d238 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/edge_count9 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk0 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk0 new file mode 100644 index 0000000000..3895554085 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk1 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk1 new file mode 100644 index 0000000000..c30a201e31 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk1 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk2 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk2 new file mode 100644 index 0000000000..6e5006e58b Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk2 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk3 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk3 new file mode 100644 index 0000000000..726a0f5d12 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk3 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk4 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk4 new file mode 100644 index 0000000000..04b0b64ac6 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk4 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk5 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk5 new file mode 100644 index 0000000000..1d794efdaa Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk5 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk6 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk6 new file mode 100644 index 0000000000..0e3b49994a Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk6 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk7 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk7 new file mode 100644 index 0000000000..b140d83c99 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk7 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk8 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk8 new file mode 100644 index 0000000000..f52149cd40 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk8 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk9 b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk9 new file mode 100644 index 0000000000..52e6c0a2b0 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/offset/chunk9 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/vertex_count b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/vertex_count new file mode 100644 index 0000000000..9830fbeab8 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/edge/person_knows_person/ordered_by_source/vertex_count differ diff --git a/test/resource/data/gar_test/ldbc_parquet/ldbc_sample.graph.yml b/test/resource/data/gar_test/ldbc_parquet/ldbc_sample.graph.yml new file mode 100644 index 0000000000..aeb8691f60 --- /dev/null +++ b/test/resource/data/gar_test/ldbc_parquet/ldbc_sample.graph.yml @@ -0,0 +1,6 @@ +name: ldbc_sample +vertices: + - person.vertex.yml +edges: + - person_knows_person.edge.yml +version: gar/v1 diff --git a/test/resource/data/gar_test/ldbc_parquet/person.vertex.yml b/test/resource/data/gar_test/ldbc_parquet/person.vertex.yml new file mode 100644 index 0000000000..4c4a277fff --- /dev/null +++ b/test/resource/data/gar_test/ldbc_parquet/person.vertex.yml @@ -0,0 +1,21 @@ +label: person +chunk_size: 100 +prefix: vertex/person/ +property_groups: + - properties: + - name: id + data_type: int64 + is_primary: true + file_type: parquet + - properties: + - name: firstName + data_type: string + is_primary: false + - name: lastName + data_type: string + is_primary: false + - name: gender + data_type: string + is_primary: false + file_type: parquet +version: gar/v1 diff --git a/test/resource/data/gar_test/ldbc_parquet/person_knows_person.edge.yml b/test/resource/data/gar_test/ldbc_parquet/person_knows_person.edge.yml new file mode 100644 index 0000000000..d7b81ea12d --- /dev/null +++ b/test/resource/data/gar_test/ldbc_parquet/person_knows_person.edge.yml @@ -0,0 +1,19 @@ +src_label: person +edge_label: knows +dst_label: person +chunk_size: 1024 +src_chunk_size: 100 +dst_chunk_size: 100 +directed: false +prefix: edge/person_knows_person/ +adj_lists: + - ordered: true + aligned_by: src + file_type: parquet +property_groups: + - file_type: parquet + properties: + - name: creationDate + data_type: string + is_primary: false +version: gar/v1 diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk0 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk0 new file mode 100644 index 0000000000..985e3fadd1 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk1 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk1 new file mode 100644 index 0000000000..03a74e36ec Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk1 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk2 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk2 new file mode 100644 index 0000000000..45d40c58ea Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk2 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk3 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk3 new file mode 100644 index 0000000000..fe70911fe0 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk3 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk4 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk4 new file mode 100644 index 0000000000..ba0d861759 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk4 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk5 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk5 new file mode 100644 index 0000000000..e1a1f906fa Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk5 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk6 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk6 new file mode 100644 index 0000000000..9411f5964b Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk6 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk7 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk7 new file mode 100644 index 0000000000..e96c254122 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk7 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk8 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk8 new file mode 100644 index 0000000000..ed93f6e378 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk8 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk9 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk9 new file mode 100644 index 0000000000..d08125d3a6 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/firstName_lastName_gender/chunk9 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk0 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk0 new file mode 100644 index 0000000000..3f73643df2 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk0 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk1 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk1 new file mode 100644 index 0000000000..da7ed507bd Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk1 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk2 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk2 new file mode 100644 index 0000000000..e5955d126a Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk2 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk3 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk3 new file mode 100644 index 0000000000..e9c5630f0d Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk3 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk4 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk4 new file mode 100644 index 0000000000..bfa0b97293 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk4 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk5 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk5 new file mode 100644 index 0000000000..d047db53a8 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk5 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk6 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk6 new file mode 100644 index 0000000000..8f7deba277 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk6 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk7 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk7 new file mode 100644 index 0000000000..be91fdc417 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk7 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk8 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk8 new file mode 100644 index 0000000000..d679fafd65 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk8 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk9 b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk9 new file mode 100644 index 0000000000..36ba2d71c5 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/id/chunk9 differ diff --git a/test/resource/data/gar_test/ldbc_parquet/vertex/person/vertex_count b/test/resource/data/gar_test/ldbc_parquet/vertex/person/vertex_count new file mode 100644 index 0000000000..9830fbeab8 Binary files /dev/null and b/test/resource/data/gar_test/ldbc_parquet/vertex/person/vertex_count differ diff --git a/test/test_import_gar.cpp b/test/test_import_gar.cpp new file mode 100644 index 0000000000..270415d541 --- /dev/null +++ b/test/test_import_gar.cpp @@ -0,0 +1,68 @@ +/** + * Copyright 2022 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#include +#include "fma-common/configuration.h" +#include "fma-common/string_formatter.h" + +#include "import/import_v3.h" +#include "import/graphar_config.h" + +#include "gtest/gtest.h" +#include "./test_tools.h" + +using namespace fma_common; +using namespace lgraph; +using namespace import_v3; + +class TestImportGar : public TuGraphTest {}; + +// The path "/test/resource/data/gar_test/edge_test" is for TestEdgeLabel +// The path "/test/resource/data/gar_test/ldbc_parquet" is for TestGarData + +TEST_F(TestImportGar, TestEdgeLabel) { + // reject the same edge label with different properties + Importer::Config config; + std::string tugraph_path = std::filesystem::current_path().parent_path().parent_path(); + UT_LOG() << tugraph_path; + config.config_file = tugraph_path + "/test/resource/data/gar_test/edge_test/movie.graph.yml"; + config.is_graphar = true; + config.delete_if_exists = true; + + nlohmann::json conf; + UT_EXPECT_ANY_THROW(ParserGraphArConf(conf, config.config_file)); +} + +TEST_F(TestImportGar, TestGarData) { + UT_LOG() << "Read gar data"; + Importer::Config config; + std::string tugraph_path = std::filesystem::current_path().parent_path().parent_path(); + UT_LOG() << tugraph_path; + config.config_file = + tugraph_path + "/test/resource/data/gar_test/ldbc_parquet/ldbc_sample.graph.yml"; + config.is_graphar = true; + config.delete_if_exists = true; + + nlohmann::json conf; + ParserGraphArConf(conf, config.config_file); + + // test the first vertex data in gar config + std::vector data_files = import_v2::ImportConfParser::ParseFiles(conf); + import_v2::GraphArParser parser = import_v2::GraphArParser(data_files.front()); + std::vector> block; + UT_EXPECT_NO_THROW(parser.ReadBlock(block)); + UT_EXPECT_EQ(block.size(), 903); + UT_EXPECT_EQ(block[0][0].ToString(), "933"); + UT_EXPECT_EQ(block[0][1].ToString(), "Mahinda"); +} diff --git a/toolkits/CMakeLists.txt b/toolkits/CMakeLists.txt index 780ed97cf7..969285f092 100644 --- a/toolkits/CMakeLists.txt +++ b/toolkits/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.13) +cmake_minimum_required(VERSION 3.16) project(TuGraph C CXX) ############### lgraph_import ###################### @@ -9,14 +9,16 @@ add_executable(${TARGET_LGRAPH_IMPORT} lgraph_import.cpp ${LGRAPH_ROOT_DIR}/src/client/cpp/rpc/lgraph_rpc_client.cpp ${LGRAPH_ROOT_DIR}/src/client/cpp/restful/rest_client.cpp - ${LGRAPH_ROOT_DIR}/src/import/import_client.cpp) + ${LGRAPH_ROOT_DIR}/src/import/import_client.cpp + ${LGRAPH_ROOT_DIR}/src/import/graphar_parser.cpp) # TODO: break circular dependency between cypher and state machine target_link_libraries(${TARGET_LGRAPH_IMPORT} lgraph_server_lib lgraph_cypher_lib lgraph_server_lib - librocksdb.a) + librocksdb.a + gar) if (NOT ENABLE_ASAN) target_link_libraries(${TARGET_LGRAPH_IMPORT} libjemalloc.a) diff --git a/toolkits/lgraph_import.cpp b/toolkits/lgraph_import.cpp index abc6372cf7..deda0cdadf 100644 --- a/toolkits/lgraph_import.cpp +++ b/toolkits/lgraph_import.cpp @@ -180,10 +180,10 @@ int main(int argc, char** argv) { config.Add(import_config.delimiter, "delimiter", true) .Comment("Delimiter used in the CSV files"); config.Add(import_config.enable_fulltext_index, "enable_fulltext_index", true) - .Comment("Whether to enable fulltext index"); + .Comment("Whether to enable fulltext index"); config.Add(import_config.fulltext_index_analyzer, "fulltext_index_analyzer", true) - .SetPossibleValues({"SmartChineseAnalyzer", "StandardAnalyzer"}) - .Comment("fulltext index analyzer"); + .SetPossibleValues({"SmartChineseAnalyzer", "StandardAnalyzer"}) + .Comment("fulltext index analyzer"); config.ExitAfterHelp(true); try { config.ParseAndFinalize(argc, argv); @@ -199,6 +199,8 @@ int main(int argc, char** argv) { config.Add(import_config_v3.config_file, "c,config_file", false) .Comment("Config file path"); config.Add(import_config_v3.continue_on_error, "i,continue_on_error", true); + config.Add(import_config_v3.is_graphar, "gar", true) + .Comment("Whether the config file is GraphAr file"); config.Add(import_config_v3.db_dir, "d,dir", true).Comment("The DB data directory"); config.Add(import_config_v3.user, "u,user", true).Comment("DB username."); config.Add(import_config_v3.password, "p,password", true).Comment("DB password.");