diff --git a/flex/codegen/src/hqps/hqps_scan_builder.h b/flex/codegen/src/hqps/hqps_scan_builder.h index 6756b9aec909..a4a282ea127b 100644 --- a/flex/codegen/src/hqps/hqps_scan_builder.h +++ b/flex/codegen/src/hqps/hqps_scan_builder.h @@ -55,7 +55,7 @@ static constexpr const char* SCAN_OP_TEMPLATE_NO_EXPR_STR = /// 4. vertex label /// 5. oid static constexpr const char* SCAN_OP_WITH_OID_ONE_LABEL_TEMPLATE_STR = - "auto %1% = Engine::template ScanVertexWithOid<%2%>(%3%, %4%, %5%);\n"; + "auto %1% = Engine::template ScanVertexWithOid<%2%,%3%>(%4%, %5%, %6%);\n"; /// Args /// 1. res_ctx_name @@ -179,14 +179,20 @@ class ScanOpBuilder { switch (const_value.item_case()) { case common::Value::kI32: oid_ = std::to_string(const_value.i32()); + oid_type_name_ = "int32_t"; break; case common::Value::kI64: oid_ = std::to_string(const_value.i64()); + oid_type_name_ = "int64_t"; break; + case common::Value::kStr: + oid_ = const_value.str(); + oid_type_name_ = "std::string_view"; default: LOG(FATAL) << "Currently only support int, long as primary key"; } - VLOG(1) << "Found oid: " << oid_ << " in index scan"; + VLOG(1) << "Found oid: " << oid_ + << " in index scan, type: " << oid_type_name_; } else { // dynamic param auto dyn_param_pb = triplet.param(); @@ -209,7 +215,7 @@ class ScanOpBuilder { // If oid_ not empty, scan with oid if (!oid_.empty()) { VLOG(1) << "Scan with oid: " << oid_; - return scan_with_oid(labels_ids_, oid_); + return scan_with_oid(labels_ids_, oid_, oid_type_name_); } else { // If no oid, scan without expression VLOG(1) << "Scan without expression"; @@ -220,15 +226,16 @@ class ScanOpBuilder { private: std::string scan_with_oid(const std::vector& label_ids, - const std::string& oid) const { + const std::string& oid, + const std::string& oid_type_name) const { VLOG(10) << "Scan with oid: " << oid; std::string next_ctx_name = ctx_.GetCurCtxName(); auto append_opt = res_alias_to_append_opt(res_alias_); if (label_ids.size() == 1) { boost::format formater(SCAN_OP_WITH_OID_ONE_LABEL_TEMPLATE_STR); - formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_ids[0] % - oid; + formater % next_ctx_name % append_opt % oid_type_name % ctx_.GraphVar() % + label_ids[0] % oid; return formater.str(); } else { boost::format formater(SCAN_OP_WITH_OID_MUL_LABEL_TEMPLATE_STR); @@ -302,6 +309,7 @@ class ScanOpBuilder { std::string expr_var_name_, expr_func_name_, expr_construct_params_, selectors_str_; // The expression decode from params. std::string oid_; // the oid decode from idx predicate, or param name. + std::string oid_type_name_; int res_alias_; }; diff --git a/flex/engines/graph_db/app/server_app.cc b/flex/engines/graph_db/app/server_app.cc index 67d705bc0eb6..4e518d1747a0 100644 --- a/flex/engines/graph_db/app/server_app.cc +++ b/flex/engines/graph_db/app/server_app.cc @@ -22,7 +22,7 @@ static uint32_t get_vertex_vid(const gs::ReadTransaction& txn, uint8_t label, uint32_t vid = std::numeric_limits::max(); auto vit = txn.GetVertexIterator(label); for (; vit.IsValid(); vit.Next()) { - if (vit.GetId() == id) { + if (vit.GetId().AsInt64() == id) { vid = vit.GetIndex(); break; } @@ -99,7 +99,7 @@ bool ServerApp::Query(Decoder& input, Encoder& output) { uint8_t vertex_label_id = txn.schema().get_vertex_label_id(vertex_label); auto vit = txn.GetVertexIterator(vertex_label_id); for (; vit.IsValid(); vit.Next()) { - if (vit.GetId() == vertex_id) { + if (vit.GetId().AsInt64() == vertex_id) { output.put_int(1); int field_num = vit.FieldNum(); for (int i = 0; i < field_num; ++i) { @@ -233,13 +233,13 @@ bool ServerApp::Query(Decoder& input, Encoder& output) { std::vector> match_edges; for (uint32_t v = dst_range.from; v != dst_range.to; ++v) { - int64_t v_oid = txn.GetVertexId(dst_label_id, v); + int64_t v_oid = txn.GetVertexId(dst_label_id, v).AsInt64(); auto ieit = txn.GetInEdgeIterator(dst_label_id, v, src_label_id, edge_label_id); while (ieit.IsValid()) { uint32_t u = ieit.GetNeighbor(); if (src_range.contains(u)) { - int64_t u_oid = txn.GetVertexId(src_label_id, u); + int64_t u_oid = txn.GetVertexId(src_label_id, u).AsInt64(); match_edges.emplace_back(u_oid, v_oid, ieit.GetData().to_string()); } @@ -248,13 +248,13 @@ bool ServerApp::Query(Decoder& input, Encoder& output) { } if (match_edges.empty()) { for (uint32_t u = src_range.from; u != src_range.to; ++u) { - int64_t u_oid = txn.GetVertexId(src_label_id, u); + int64_t u_oid = txn.GetVertexId(src_label_id, u).AsInt64(); auto oeit = txn.GetOutEdgeIterator(src_label_id, u, dst_label_id, edge_label_id); while (oeit.IsValid()) { uint32_t v = oeit.GetNeighbor(); if (dst_range.contains(v)) { - int64_t v_oid = txn.GetVertexId(dst_label_id, v); + int64_t v_oid = txn.GetVertexId(dst_label_id, v).AsInt64(); match_edges.emplace_back(u_oid, v_oid, oeit.GetData().to_string()); } diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc index 84d609d7c808..f22f0094b3f5 100644 --- a/flex/engines/graph_db/database/graph_db_session.cc +++ b/flex/engines/graph_db/database/graph_db_session.cc @@ -65,8 +65,18 @@ std::shared_ptr GraphDBSession::get_vertex_property_column( std::shared_ptr GraphDBSession::get_vertex_id_column( uint8_t label) const { - return std::make_shared>( - db_.graph().lf_indexers_[label].get_keys(), StorageStrategy::kMem); + if (db_.graph().lf_indexers_[label].get_type() == PropertyType::kInt64) { + return std::make_shared>( + dynamic_cast&>( + db_.graph().lf_indexers_[label].get_keys())); + } else if (db_.graph().lf_indexers_[label].get_type() == + PropertyType::kString) { + return std::make_shared>( + dynamic_cast&>( + db_.graph().lf_indexers_[label].get_keys())); + } else { + return nullptr; + } } #define likely(x) __builtin_expect(!!(x), 1) diff --git a/flex/engines/graph_db/database/insert_transaction.cc b/flex/engines/graph_db/database/insert_transaction.cc index 049443ee503d..c8f2ef75d16a 100644 --- a/flex/engines/graph_db/database/insert_transaction.cc +++ b/flex/engines/graph_db/database/insert_transaction.cc @@ -34,10 +34,11 @@ InsertTransaction::InsertTransaction(MutablePropertyFragment& graph, InsertTransaction::~InsertTransaction() { Abort(); } -bool InsertTransaction::AddVertex(label_t label, oid_t id, +bool InsertTransaction::AddVertex(label_t label, const Any& id, const std::vector& props) { size_t arc_size = arc_.GetSize(); - arc_ << static_cast(0) << label << id; + arc_ << static_cast(0) << label; + serialize_field(arc_, id); const std::vector& types = graph_.schema().get_vertex_properties(label); if (types.size() != props.size()) { @@ -65,15 +66,15 @@ bool InsertTransaction::AddVertex(label_t label, oid_t id, return true; } -bool InsertTransaction::AddEdge(label_t src_label, oid_t src, label_t dst_label, - oid_t dst, label_t edge_label, - const Any& prop) { +bool InsertTransaction::AddEdge(label_t src_label, const Any& src, + label_t dst_label, const Any& dst, + label_t edge_label, const Any& prop) { vid_t lid; if (!graph_.get_lid(src_label, src, lid)) { if (added_vertices_.find(std::make_pair(src_label, src)) == added_vertices_.end()) { std::string label_name = graph_.schema().get_vertex_label_name(src_label); - LOG(ERROR) << "Source vertex " << label_name << "[" << src + LOG(ERROR) << "Source vertex " << label_name << "[" << src.to_string() << "] not found..."; return false; } @@ -82,8 +83,8 @@ bool InsertTransaction::AddEdge(label_t src_label, oid_t src, label_t dst_label, if (added_vertices_.find(std::make_pair(dst_label, dst)) == added_vertices_.end()) { std::string label_name = graph_.schema().get_vertex_label_name(dst_label); - LOG(ERROR) << "Destination vertex " << label_name << "[" << dst - << "] not found..."; + LOG(ERROR) << "Destination vertex " << label_name << "[" + << dst.to_string() << "] not found..."; return false; } } @@ -95,8 +96,11 @@ bool InsertTransaction::AddEdge(label_t src_label, oid_t src, label_t dst_label, << type << ", got " << prop.type; return false; } - arc_ << static_cast(1) << src_label << src << dst_label << dst - << edge_label; + arc_ << static_cast(1) << src_label; + serialize_field(arc_, src); + arc_ << dst_label; + serialize_field(arc_, dst); + arc_ << edge_label; serialize_field(arc_, prop); return true; } @@ -143,17 +147,17 @@ void InsertTransaction::IngestWal(MutablePropertyFragment& graph, arc >> op_type; if (op_type == 0) { label_t label; - oid_t id; - - arc >> label >> id; + Any id; + label = deserialize_oid(graph, arc, id); vid_t lid = graph.add_vertex(label, id); graph.get_vertex_table(label).ingest(lid, arc); } else if (op_type == 1) { label_t src_label, dst_label, edge_label; - oid_t src, dst; + Any src, dst; vid_t src_lid, dst_lid; - - arc >> src_label >> src >> dst_label >> dst >> edge_label; + src_label = deserialize_oid(graph, arc, src); + dst_label = deserialize_oid(graph, arc, dst); + arc >> edge_label; CHECK(get_vertex_with_retries(graph, src_label, src, src_lid)); CHECK(get_vertex_with_retries(graph, dst_label, dst, dst_lid)); @@ -177,7 +181,7 @@ void InsertTransaction::clear() { #define likely(x) __builtin_expect(!!(x), 1) bool InsertTransaction::get_vertex_with_retries(MutablePropertyFragment& graph, - label_t label, oid_t oid, + label_t label, const Any& oid, vid_t& lid) { if (likely(graph.get_lid(label, oid, lid))) { return true; @@ -189,7 +193,7 @@ bool InsertTransaction::get_vertex_with_retries(MutablePropertyFragment& graph, } } - LOG(ERROR) << "get_vertex [" << oid << "] failed"; + LOG(ERROR) << "get_vertex [" << oid.to_string() << "] failed"; return false; } diff --git a/flex/engines/graph_db/database/insert_transaction.h b/flex/engines/graph_db/database/insert_transaction.h index cad35bac1d4c..a9816975dbae 100644 --- a/flex/engines/graph_db/database/insert_transaction.h +++ b/flex/engines/graph_db/database/insert_transaction.h @@ -37,10 +37,10 @@ class InsertTransaction { ~InsertTransaction(); - bool AddVertex(label_t label, oid_t id, const std::vector& props); + bool AddVertex(label_t label, const Any& id, const std::vector& props); - bool AddEdge(label_t src_label, oid_t src, label_t dst_label, oid_t dst, - label_t edge_label, const Any& prop); + bool AddEdge(label_t src_label, const Any& src, label_t dst_label, + const Any& dst, label_t edge_label, const Any& prop); void Commit(); @@ -55,11 +55,12 @@ class InsertTransaction { void clear(); static bool get_vertex_with_retries(MutablePropertyFragment& graph, - label_t label, oid_t oid, vid_t& lid); + label_t label, const Any& oid, + vid_t& lid); grape::InArchive arc_; - std::set> added_vertices_; + std::set> added_vertices_; MutablePropertyFragment& graph_; ArenaAllocator& alloc_; diff --git a/flex/engines/graph_db/database/read_transaction.cc b/flex/engines/graph_db/database/read_transaction.cc index 473da018fef5..1bad63335b4c 100644 --- a/flex/engines/graph_db/database/read_transaction.cc +++ b/flex/engines/graph_db/database/read_transaction.cc @@ -41,7 +41,7 @@ void ReadTransaction::vertex_iterator::Goto(vid_t target) { cur_ = std::min(target, num_); } -oid_t ReadTransaction::vertex_iterator::GetId() const { +Any ReadTransaction::vertex_iterator::GetId() const { return graph_.get_oid(label_, cur_); } vid_t ReadTransaction::vertex_iterator::GetIndex() const { return cur_; } @@ -89,8 +89,8 @@ ReadTransaction::vertex_iterator ReadTransaction::GetVertexIterator( return {label, 0, graph_.vertex_num(label), graph_}; } -ReadTransaction::vertex_iterator ReadTransaction::FindVertex(label_t label, - oid_t id) const { +ReadTransaction::vertex_iterator ReadTransaction::FindVertex( + label_t label, const Any& id) const { vid_t lid; if (graph_.get_lid(label, id, lid)) { return {label, lid, graph_.vertex_num(label), graph_}; @@ -99,7 +99,7 @@ ReadTransaction::vertex_iterator ReadTransaction::FindVertex(label_t label, } } -bool ReadTransaction::GetVertexIndex(label_t label, oid_t id, +bool ReadTransaction::GetVertexIndex(label_t label, const Any& id, vid_t& index) const { return graph_.get_lid(label, id, index); } @@ -108,7 +108,7 @@ vid_t ReadTransaction::GetVertexNum(label_t label) const { return graph_.vertex_num(label); } -oid_t ReadTransaction::GetVertexId(label_t label, vid_t index) const { +Any ReadTransaction::GetVertexId(label_t label, vid_t index) const { return graph_.get_oid(label, index); } diff --git a/flex/engines/graph_db/database/read_transaction.h b/flex/engines/graph_db/database/read_transaction.h index bd03f0111894..b1c72e6abd80 100644 --- a/flex/engines/graph_db/database/read_transaction.h +++ b/flex/engines/graph_db/database/read_transaction.h @@ -143,7 +143,7 @@ class ReadTransaction { void Next(); void Goto(vid_t target); - oid_t GetId() const; + Any GetId() const; vid_t GetIndex() const; Any GetField(int col_id) const; @@ -183,13 +183,13 @@ class ReadTransaction { vertex_iterator GetVertexIterator(label_t label) const; - vertex_iterator FindVertex(label_t label, oid_t id) const; + vertex_iterator FindVertex(label_t label, const Any& id) const; - bool GetVertexIndex(label_t label, oid_t id, vid_t& index) const; + bool GetVertexIndex(label_t label, const Any& id, vid_t& index) const; vid_t GetVertexNum(label_t label) const; - oid_t GetVertexId(label_t label, vid_t index) const; + Any GetVertexId(label_t label, vid_t index) const; edge_iterator GetOutEdgeIterator(label_t label, vid_t u, label_t neighnor_label, diff --git a/flex/engines/graph_db/database/single_edge_insert_transaction.cc b/flex/engines/graph_db/database/single_edge_insert_transaction.cc index ba241c6557ab..2e18f9b1441e 100644 --- a/flex/engines/graph_db/database/single_edge_insert_transaction.cc +++ b/flex/engines/graph_db/database/single_edge_insert_transaction.cc @@ -37,18 +37,18 @@ SingleEdgeInsertTransaction::SingleEdgeInsertTransaction( SingleEdgeInsertTransaction::~SingleEdgeInsertTransaction() { Abort(); } -bool SingleEdgeInsertTransaction::AddEdge(label_t src_label, oid_t src, - label_t dst_label, oid_t dst, +bool SingleEdgeInsertTransaction::AddEdge(label_t src_label, const Any& src, + label_t dst_label, const Any& dst, label_t edge_label, const Any& prop) { if (!graph_.get_lid(src_label, src, src_vid_)) { std::string label_name = graph_.schema().get_vertex_label_name(src_label); - LOG(ERROR) << "Source vertex " << label_name << "[" << src + LOG(ERROR) << "Source vertex " << label_name << "[" << src.to_string() << "] not found..."; return false; } if (!graph_.get_lid(dst_label, dst, dst_vid_)) { std::string label_name = graph_.schema().get_vertex_label_name(dst_label); - LOG(ERROR) << "Destination vertex " << label_name << "[" << dst + LOG(ERROR) << "Destination vertex " << label_name << "[" << dst.to_string() << "] not found..."; return false; } @@ -63,8 +63,11 @@ bool SingleEdgeInsertTransaction::AddEdge(label_t src_label, oid_t src, src_label_ = src_label; dst_label_ = dst_label; edge_label_ = edge_label; - arc_ << static_cast(1) << src_label << src << dst_label << dst - << edge_label; + arc_ << static_cast(1) << src_label; + serialize_field(arc_, src); + arc_ << dst_label; + serialize_field(arc_, dst); + arc_ << edge_label; serialize_field(arc_, prop); return true; } @@ -93,8 +96,16 @@ void SingleEdgeInsertTransaction::Commit() { logger_.append(arc_.GetBuffer(), arc_.GetSize()); grape::OutArchive arc; - arc.SetSlice(arc_.GetBuffer() + sizeof(WalHeader) + 20, - arc_.GetSize() - sizeof(WalHeader) - 20); + { + arc.SetSlice(arc_.GetBuffer() + sizeof(WalHeader), + arc_.GetSize() - sizeof(WalHeader)); + label_t op_type, label; + Any temp; + arc >> op_type; + deserialize_oid(graph_, arc, temp); + deserialize_oid(graph_, arc, temp); + arc >> label; + } graph_.IngestEdge(src_label_, src_vid_, dst_label_, dst_vid_, edge_label_, timestamp_, arc, alloc_); vm_.release_insert_timestamp(timestamp_); diff --git a/flex/engines/graph_db/database/single_edge_insert_transaction.h b/flex/engines/graph_db/database/single_edge_insert_transaction.h index ab4710acfe7e..eff701074b75 100644 --- a/flex/engines/graph_db/database/single_edge_insert_transaction.h +++ b/flex/engines/graph_db/database/single_edge_insert_transaction.h @@ -34,8 +34,8 @@ class SingleEdgeInsertTransaction { VersionManager& vm, timestamp_t timestamp); ~SingleEdgeInsertTransaction(); - bool AddEdge(label_t src_label, oid_t src, label_t dst_label, oid_t dst, - label_t edge_label, const Any& prop); + bool AddEdge(label_t src_label, const Any& src, label_t dst_label, + const Any& dst, label_t edge_label, const Any& prop); void Abort(); diff --git a/flex/engines/graph_db/database/single_vertex_insert_transaction.cc b/flex/engines/graph_db/database/single_vertex_insert_transaction.cc index 50f9bfc97ded..ba26598968a2 100644 --- a/flex/engines/graph_db/database/single_vertex_insert_transaction.cc +++ b/flex/engines/graph_db/database/single_vertex_insert_transaction.cc @@ -36,10 +36,11 @@ SingleVertexInsertTransaction::SingleVertexInsertTransaction( } SingleVertexInsertTransaction::~SingleVertexInsertTransaction() { Abort(); } -bool SingleVertexInsertTransaction::AddVertex(label_t label, oid_t id, +bool SingleVertexInsertTransaction::AddVertex(label_t label, const Any& id, const std::vector& props) { size_t arc_size = arc_.GetSize(); - arc_ << static_cast(0) << label << id; + arc_ << static_cast(0) << label; + serialize_field(arc_, id); const std::vector& types = graph_.schema().get_vertex_properties(label); if (types.size() != props.size()) { @@ -68,23 +69,23 @@ bool SingleVertexInsertTransaction::AddVertex(label_t label, oid_t id, return true; } -bool SingleVertexInsertTransaction::AddEdge(label_t src_label, oid_t src, - label_t dst_label, oid_t dst, +bool SingleVertexInsertTransaction::AddEdge(label_t src_label, const Any& src, + label_t dst_label, const Any& dst, label_t edge_label, const Any& prop) { vid_t src_vid, dst_vid; if (src == added_vertex_id_ && src_label == added_vertex_label_) { if (!graph_.get_lid(dst_label, dst, dst_vid)) { std::string label_name = graph_.schema().get_vertex_label_name(dst_label); - LOG(ERROR) << "Destination vertex " << label_name << "[" << dst - << "] not found..."; + LOG(ERROR) << "Destination vertex " << label_name << "[" + << dst.to_string() << "] not found..."; return false; } src_vid = std::numeric_limits::max(); } else if (dst == added_vertex_id_ && dst_label == added_vertex_label_) { if (!graph_.get_lid(src_label, src, src_vid)) { std::string label_name = graph_.schema().get_vertex_label_name(src_label); - LOG(ERROR) << "Source vertex " << label_name << "[" << src + LOG(ERROR) << "Source vertex " << label_name << "[" << src.to_string() << "] not found..."; return false; } @@ -92,13 +93,13 @@ bool SingleVertexInsertTransaction::AddEdge(label_t src_label, oid_t src, } else { if (!graph_.get_lid(dst_label, dst, dst_vid)) { std::string label_name = graph_.schema().get_vertex_label_name(dst_label); - LOG(ERROR) << "Destination vertex " << label_name << "[" << dst - << "] not found..."; + LOG(ERROR) << "Destination vertex " << label_name << "[" + << dst.to_string() << "] not found..."; return false; } if (!graph_.get_lid(src_label, src, src_vid)) { std::string label_name = graph_.schema().get_vertex_label_name(src_label); - LOG(ERROR) << "Source vertex " << label_name << "[" << src + LOG(ERROR) << "Source vertex " << label_name << "[" << src.to_string() << "] not found..."; return false; } @@ -111,8 +112,11 @@ bool SingleVertexInsertTransaction::AddEdge(label_t src_label, oid_t src, << type << ", got " << prop.type; return false; } - arc_ << static_cast(1) << src_label << src << dst_label << dst - << edge_label; + arc_ << static_cast(1) << src_label; + serialize_field(arc_, src); + arc_ << dst_label; + serialize_field(arc_, dst); + arc_ << edge_label; serialize_field(arc_, prop); parsed_endpoints_.push_back(src_vid); parsed_endpoints_.push_back(dst_vid); @@ -162,17 +166,18 @@ void SingleVertexInsertTransaction::ingestWal() { uint8_t op_type; arc >> op_type; if (op_type == 0) { - arc.GetBytes(sizeof(label_t) + sizeof(oid_t)); + Any temp; + deserialize_oid(graph_, arc, temp); added_vertex_vid_ = graph_.add_vertex(added_vertex_label_, added_vertex_id_); graph_.get_vertex_table(added_vertex_label_) .ingest(added_vertex_vid_, arc); } else if (op_type == 1) { + Any temp; label_t src_label, dst_label, edge_label; - arc >> src_label; - arc.GetBytes(sizeof(oid_t)); - arc >> dst_label; - arc.GetBytes(sizeof(oid_t)); + src_label = deserialize_oid(graph_, arc, temp); + dst_label = deserialize_oid(graph_, arc, temp); + arc >> edge_label; vid_t src_vid = *(vid_ptr++); diff --git a/flex/engines/graph_db/database/single_vertex_insert_transaction.h b/flex/engines/graph_db/database/single_vertex_insert_transaction.h index 2d7ffd9e0c44..305fbe86a20b 100644 --- a/flex/engines/graph_db/database/single_vertex_insert_transaction.h +++ b/flex/engines/graph_db/database/single_vertex_insert_transaction.h @@ -17,6 +17,7 @@ #define GRAPHSCOPE_DATABASE_SINGLE_VERTEX_INSERT_TRANSACTION_H_ #include "flex/storages/rt_mutable_graph/types.h" +#include "flex/utils/property/types.h" #include "grape/serialization/in_archive.h" namespace gs { @@ -25,7 +26,6 @@ class MutablePropertyFragment; class ArenaAllocator; class WalWriter; class VersionManager; -class Any; class SingleVertexInsertTransaction { public: @@ -34,10 +34,10 @@ class SingleVertexInsertTransaction { VersionManager& vm, timestamp_t timestamp); ~SingleVertexInsertTransaction(); - bool AddVertex(label_t label, oid_t id, const std::vector& props); + bool AddVertex(label_t label, const Any& id, const std::vector& props); - bool AddEdge(label_t src_label, oid_t src, label_t dst_label, oid_t dst, - label_t edge_label, const Any& prop); + bool AddEdge(label_t src_label, const Any& src, label_t dst_label, + const Any& dst, label_t edge_label, const Any& prop); void Commit(); @@ -53,7 +53,7 @@ class SingleVertexInsertTransaction { grape::InArchive arc_; label_t added_vertex_label_; - oid_t added_vertex_id_; + Any added_vertex_id_; vid_t added_vertex_vid_; std::vector parsed_endpoints_; diff --git a/flex/engines/graph_db/database/transaction_utils.h b/flex/engines/graph_db/database/transaction_utils.h index f219414f15ec..13aac2b70761 100644 --- a/flex/engines/graph_db/database/transaction_utils.h +++ b/flex/engines/graph_db/database/transaction_utils.h @@ -16,6 +16,7 @@ #ifndef GRAPHSCOPE_DATABASE_TRANSACTION_UTILS_H_ #define GRAPHSCOPE_DATABASE_TRANSACTION_UTILS_H_ +#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h" #include "flex/utils/property/types.h" #include "glog/logging.h" #include "grape/serialization/in_archive.h" @@ -71,6 +72,15 @@ inline void deserialize_field(grape::OutArchive& arc, Any& prop) { } } +inline label_t deserialize_oid(const MutablePropertyFragment& graph, + grape::OutArchive& arc, Any& oid) { + label_t label; + arc >> label; + oid.type = std::get<0>(graph.schema_.get_vertex_primary_key(label).at(0)); + deserialize_field(arc, oid); + return label; +} + } // namespace gs #endif // GRAPHSCOPE_DATABASE_TRANSACTION_UTILS_H_ diff --git a/flex/engines/graph_db/database/update_transaction.cc b/flex/engines/graph_db/database/update_transaction.cc index 89ddb75c1c05..35f63ff6822c 100644 --- a/flex/engines/graph_db/database/update_transaction.cc +++ b/flex/engines/graph_db/database/update_transaction.cc @@ -37,8 +37,18 @@ UpdateTransaction::UpdateTransaction(MutablePropertyFragment& graph, vertex_label_num_ = graph_.schema().vertex_label_num(); edge_label_num_ = graph_.schema().edge_label_num(); + for (auto idx = 0; idx < vertex_label_num_; ++idx) { + if (graph_.lf_indexers_[idx].get_type() == PropertyType::kInt64) { + added_vertices_.emplace_back( + std::make_shared>()); + } else if (graph_.lf_indexers_[idx].get_type() == PropertyType::kString) { + added_vertices_.emplace_back( + std::make_shared>()); + } else { + LOG(FATAL) << "Only int64 and string_view types for pk are supported.."; + } + } - added_vertices_.resize(vertex_label_num_); added_vertices_base_.resize(vertex_label_num_); vertex_nums_.resize(vertex_label_num_); for (size_t i = 0; i < vertex_label_num_; ++i) { @@ -83,7 +93,7 @@ void UpdateTransaction::Commit() { void UpdateTransaction::Abort() { release(); } -bool UpdateTransaction::AddVertex(label_t label, oid_t oid, +bool UpdateTransaction::AddVertex(label_t label, const Any& oid, const std::vector& props) { vid_t id; const std::vector& types = @@ -98,7 +108,7 @@ bool UpdateTransaction::AddVertex(label_t label, oid_t oid, } } if (!oid_to_lid(label, oid, id)) { - added_vertices_[label]._add(oid); + added_vertices_[label]->_add(oid); id = vertex_nums_[label]++; } @@ -113,14 +123,15 @@ bool UpdateTransaction::AddVertex(label_t label, oid_t oid, extra_vertex_properties_[label].ingest(row_num, oarc); op_num_ += 1; - arc_ << static_cast(0) << label << oid; + arc_ << static_cast(0) << label; + serialize_field(arc_, oid); arc_.AddBytes(arc.GetBuffer(), arc.GetSize()); return true; } -bool UpdateTransaction::AddEdge(label_t src_label, oid_t src, label_t dst_label, - oid_t dst, label_t edge_label, - const Any& value) { +bool UpdateTransaction::AddEdge(label_t src_label, const Any& src, + label_t dst_label, const Any& dst, + label_t edge_label, const Any& value) { vid_t src_lid, dst_lid; if (!oid_to_lid(src_label, src, src_lid)) { return false; @@ -142,8 +153,11 @@ bool UpdateTransaction::AddEdge(label_t src_label, oid_t src, label_t dst_label, updated_edge_data_[out_csr_index][src_lid].emplace(dst_lid, value); op_num_ += 1; - arc_ << static_cast(1) << src_label << src << dst_label << dst - << edge_label; + arc_ << static_cast(1) << src_label; + serialize_field(arc_, src); + arc_ << dst_label; + serialize_field(arc_, dst); + arc_ << edge_label; serialize_field(arc_, value); return true; @@ -160,7 +174,7 @@ void UpdateTransaction::vertex_iterator::Goto(vid_t target) { cur_ = std::min(target, num_); } -oid_t UpdateTransaction::vertex_iterator::GetId() const { +Any UpdateTransaction::vertex_iterator::GetId() const { return txn_->lid_to_oid(label_, cur_); } @@ -346,7 +360,9 @@ bool UpdateTransaction::SetVertexField(label_t label, vid_t lid, int col_id, } op_num_ += 1; - arc_ << static_cast(2) << label << lid_to_oid(label, lid) << col_id; + arc_ << static_cast(2) << label; + serialize_field(arc_, lid_to_oid(label, lid)); + arc_ << col_id; serialize_field(arc_, value); return true; } @@ -367,9 +383,11 @@ void UpdateTransaction::SetEdgeData(bool dir, label_t label, vid_t v, } op_num_ += 1; - arc_ << static_cast(3) << static_cast(dir ? 1 : 0) << label - << lid_to_oid(label, v) << neighbor_label - << lid_to_oid(neighbor_label, nbr) << edge_label; + arc_ << static_cast(3) << static_cast(dir ? 1 : 0) << label; + serialize_field(arc_, lid_to_oid(label, v)); + arc_ << neighbor_label; + serialize_field(arc_, lid_to_oid(neighbor_label, nbr)); + arc_ << edge_label; serialize_field(arc_, value); } @@ -396,7 +414,7 @@ bool UpdateTransaction::GetUpdatedEdgeData(bool dir, label_t label, vid_t v, void UpdateTransaction::IngestWal(MutablePropertyFragment& graph, uint32_t timestamp, char* data, size_t length, ArenaAllocator& alloc) { - std::vector> added_vertices; + std::vector>> added_vertices; std::vector added_vertices_base; std::vector vertex_nums; std::vector> vertex_offsets; @@ -409,7 +427,18 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph, size_t vertex_label_num = graph.schema().vertex_label_num(); size_t edge_label_num = graph.schema().edge_label_num(); - added_vertices.resize(vertex_label_num); + for (auto idx = 0; idx < vertex_label_num; ++idx) { + if (graph.lf_indexers_[idx].get_type() == PropertyType::kInt64) { + added_vertices.emplace_back( + std::make_shared>()); + } else if (graph.lf_indexers_[idx].get_type() == PropertyType::kString) { + added_vertices.emplace_back( + std::make_shared>()); + } else { + LOG(FATAL) << "Only int64 and string_view types for pk are supported.."; + } + } + added_vertices_base.resize(vertex_label_num); vertex_nums.resize(vertex_label_num); for (size_t i = 0; i < vertex_label_num; ++i) { @@ -434,9 +463,8 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph, arc >> op_type; if (op_type == 0) { label_t label; - oid_t oid; - - arc >> label >> oid; + Any oid; + label = deserialize_oid(graph, arc, oid); vid_t vid; if (!graph.get_lid(label, oid, vid)) { vid = graph.add_vertex(label, oid); @@ -444,30 +472,33 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph, graph.get_vertex_table(label).ingest(vid, arc); } else if (op_type == 1) { label_t src_label, dst_label, edge_label; - oid_t src, dst; + Any src, dst; vid_t src_vid, dst_vid; - - arc >> src_label >> src >> dst_label >> dst >> edge_label; + src_label = deserialize_oid(graph, arc, src); + dst_label = deserialize_oid(graph, arc, dst); + arc >> edge_label; CHECK(graph.get_lid(src_label, src, src_vid)); CHECK(graph.get_lid(dst_label, dst, dst_vid)); graph.IngestEdge(src_label, src_vid, dst_label, dst_vid, edge_label, timestamp, arc, alloc); } else if (op_type == 2) { label_t label; - oid_t oid; + Any oid; int col_id; - - arc >> label >> oid >> col_id; + label = deserialize_oid(graph, arc, oid); + arc >> col_id; vid_t vid; CHECK(graph.get_lid(label, oid, vid)); graph.get_vertex_table(label).get_column_by_id(col_id)->ingest(vid, arc); } else if (op_type == 3) { uint8_t dir; label_t label, neighbor_label, edge_label; - oid_t v, nbr; + Any v, nbr; vid_t v_lid, nbr_lid; - - arc >> dir >> label >> v >> neighbor_label >> nbr >> edge_label; + arc >> dir; + label = deserialize_oid(graph, arc, v); + neighbor_label = deserialize_oid(graph, arc, nbr); + arc >> edge_label; CHECK(graph.get_lid(label, v, v_lid)); CHECK(graph.get_lid(neighbor_label, nbr, nbr_lid)); @@ -511,11 +542,12 @@ size_t UpdateTransaction::get_out_csr_index(label_t src_label, vertex_label_num_ * vertex_label_num_ * edge_label_num_; } -bool UpdateTransaction::oid_to_lid(label_t label, oid_t oid, vid_t& lid) const { +bool UpdateTransaction::oid_to_lid(label_t label, const Any& oid, + vid_t& lid) const { if (graph_.get_lid(label, oid, lid)) { return true; } else { - if (added_vertices_[label].get_index(oid, lid)) { + if (added_vertices_[label]->get_index(oid, lid)) { lid += added_vertices_base_[label]; return true; } @@ -523,13 +555,13 @@ bool UpdateTransaction::oid_to_lid(label_t label, oid_t oid, vid_t& lid) const { return false; } -oid_t UpdateTransaction::lid_to_oid(label_t label, vid_t lid) const { +Any UpdateTransaction::lid_to_oid(label_t label, vid_t lid) const { if (graph_.vertex_num(label) > lid) { return graph_.get_oid(label, lid); } else { - oid_t ret; - CHECK( - added_vertices_[label].get_key(lid - added_vertices_base_[label], ret)); + Any ret; + CHECK(added_vertices_[label]->get_key(lid - added_vertices_base_[label], + ret)); return ret; } } @@ -553,19 +585,19 @@ void UpdateTransaction::release() { void UpdateTransaction::applyVerticesUpdates() { for (label_t label = 0; label < vertex_label_num_; ++label) { - std::vector> added_vertices; - vid_t added_vertices_num = added_vertices_[label].size(); + std::vector> added_vertices; + vid_t added_vertices_num = added_vertices_[label]->size(); for (vid_t v = 0; v < added_vertices_num; ++v) { vid_t lid = v + added_vertices_base_[label]; - oid_t oid; - CHECK(added_vertices_[label].get_key(v, oid)); + Any oid; + CHECK(added_vertices_[label]->get_key(v, oid)); added_vertices.emplace_back(lid, oid); } - std::sort(added_vertices.begin(), added_vertices.end(), - [](const std::pair& lhs, - const std::pair& rhs) { - return lhs.first < rhs.first; - }); + std::sort( + added_vertices.begin(), added_vertices.end(), + [](const std::pair& lhs, const std::pair& rhs) { + return lhs.first < rhs.first; + }); auto& table = extra_vertex_properties_[label]; auto& vertex_offset = vertex_offsets_[label]; diff --git a/flex/engines/graph_db/database/update_transaction.h b/flex/engines/graph_db/database/update_transaction.h index f4d0e9d6b1d3..4926df009978 100644 --- a/flex/engines/graph_db/database/update_transaction.h +++ b/flex/engines/graph_db/database/update_transaction.h @@ -48,10 +48,10 @@ class UpdateTransaction { void Abort(); - bool AddVertex(label_t label, oid_t oid, const std::vector& props); + bool AddVertex(label_t label, const Any& oid, const std::vector& props); - bool AddEdge(label_t src_label, oid_t src, label_t dst_label, oid_t dst, - label_t edge_label, const Any& value); + bool AddEdge(label_t src_label, const Any& src, label_t dst_label, + const Any& dst, label_t edge_label, const Any& value); class vertex_iterator { public: @@ -62,7 +62,7 @@ class UpdateTransaction { void Next(); void Goto(vid_t target); - oid_t GetId() const; + Any GetId() const; vid_t GetIndex() const; @@ -146,9 +146,9 @@ class UpdateTransaction { size_t get_out_csr_index(label_t src_label, label_t dst_label, label_t edge_label) const; - bool oid_to_lid(label_t label, oid_t oid, vid_t& lid) const; + bool oid_to_lid(label_t label, const Any& oid, vid_t& lid) const; - oid_t lid_to_oid(label_t label, vid_t lid) const; + Any lid_to_oid(label_t label, vid_t lid) const; void release(); @@ -168,7 +168,7 @@ class UpdateTransaction { size_t vertex_label_num_; size_t edge_label_num_; - std::vector> added_vertices_; + std::vector>> added_vertices_; std::vector added_vertices_base_; std::vector vertex_nums_; std::vector> vertex_offsets_; diff --git a/flex/engines/graph_db/grin/CMakeLists.txt b/flex/engines/graph_db/grin/CMakeLists.txt index 12b016724d28..2e855d82eab8 100644 --- a/flex/engines/graph_db/grin/CMakeLists.txt +++ b/flex/engines/graph_db/grin/CMakeLists.txt @@ -50,16 +50,10 @@ endif () find_package(yaml-cpp REQUIRED) include_directories(SYSTEM ${yaml-cpp_INCLUDE_DIRS}) -#set(yaml-cpp_INCLUDE_DIRS "/usr/local/Cellar/yaml-cpp/0.7.0/include") -#include_directories(SYSTEM ${yaml-cpp_INCLUDE_DIRS}) -#set(YAML_CPP_LIBRARIES "/usr/local/Cellar/yaml-cpp/0.7.0/lib/libyaml-cpp.0.7.0.dylib") - - include_directories(${CMAKE_CURRENT_SOURCE_DIR}) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/..) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../../..) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../../../..) -message(STATUS "${CMAKE_CURRENT_SOURCE_DIR}") include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../../../storages/rt_mutable_graph) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../../../utils/property) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include) diff --git a/flex/engines/graph_db/grin/src/index/pk.cc b/flex/engines/graph_db/grin/src/index/pk.cc index 6227edd46482..996a0d9fcbf0 100644 --- a/flex/engines/graph_db/grin/src/index/pk.cc +++ b/flex/engines/graph_db/grin/src/index/pk.cc @@ -31,10 +31,20 @@ GRIN_VERTEX grin_get_vertex_by_primary_keys_row(GRIN_GRAPH g, GRIN_ROW r) { auto _r = static_cast(r); auto _g = static_cast(g); - auto oid = *static_cast((*_r)[0]); + auto type = _g->g.lf_indexers_[label].get_type(); uint32_t vid; - if (!_g->g.get_lid(label, oid, vid)) { + if (type == gs::PropertyType::kInt64) { + auto oid = *static_cast((*_r)[0]); + if (!_g->g.get_lid(label, oid, vid)) { + return GRIN_NULL_VERTEX; + } + } else if (type == gs::PropertyType::kString) { + auto oid = *static_cast((*_r)[0]); + if (!_g->g.get_lid(label, oid, vid)) { + return GRIN_NULL_VERTEX; + } + } else { return GRIN_NULL_VERTEX; } uint64_t v = ((label * 1ull) << 32) + vid; diff --git a/flex/engines/graph_db/grin/src/predefine.h b/flex/engines/graph_db/grin/src/predefine.h index ad6df6d85555..4c8e3e1c89f1 100644 --- a/flex/engines/graph_db/grin/src/predefine.h +++ b/flex/engines/graph_db/grin/src/predefine.h @@ -5,7 +5,6 @@ #include "storages/rt_mutable_graph/loading_config.h" #include "storages/rt_mutable_graph/mutable_property_fragment.h" -typedef gs::oid_t GRIN_OID_T; typedef gs::vid_t GRIN_VID_T; typedef struct GRIN_GRAPH_T { diff --git a/flex/engines/graph_db/grin/src/property/primarykey.cc b/flex/engines/graph_db/grin/src/property/primarykey.cc index 878fa0b2b32f..ba6729c2ae28 100644 --- a/flex/engines/graph_db/grin/src/property/primarykey.cc +++ b/flex/engines/graph_db/grin/src/property/primarykey.cc @@ -27,12 +27,20 @@ GRIN_VERTEX_TYPE_LIST grin_get_vertex_types_with_primary_keys(GRIN_GRAPH g) { * @return The primary keys properties list */ GRIN_VERTEX_PROPERTY_LIST grin_get_primary_keys_by_vertex_type( - GRIN_GRAPH, GRIN_VERTEX_TYPE label) { + GRIN_GRAPH g, GRIN_VERTEX_TYPE label) { + auto _g = static_cast(g); + auto type = _g->g.lf_indexers_[label].get_type(); GRIN_VERTEX_PROPERTY_LIST_T* vpl = new GRIN_VERTEX_PROPERTY_LIST_T(); GRIN_VERTEX_PROPERTY vp; vp = 0; vp += (label * 1u) << 8; - vp += (GRIN_DATATYPE::Int64 * 1u) << 16; + if (type == gs::PropertyType::kInt64) { + vp += (GRIN_DATATYPE::Int64 * 1u) << 16; + } else if (type == gs::PropertyType::kString) { + vp += (GRIN_DATATYPE::String * 1u) << 16; + } else { + vp = GRIN_NULL_VERTEX_PROPERTY; + } vpl->emplace_back(vp); return vpl; } @@ -49,9 +57,18 @@ GRIN_ROW grin_get_vertex_primary_keys_row(GRIN_GRAPH g, GRIN_VERTEX v) { auto _g = static_cast(g); auto vid = v & (0xffffffff); auto label = v >> 32; - auto oid = _g->g.get_oid(label, vid); - auto p = new gs::oid_t(oid); - row->emplace_back(p); + auto type = _g->g.lf_indexers_[label].get_type(); + if (type == gs::PropertyType::kInt64) { + auto oid = _g->g.get_oid(label, vid).AsInt64(); + auto p = new int64_t(oid); + row->emplace_back(p); + } else if (type == gs::PropertyType::kString) { + auto oid = _g->g.get_oid(label, vid).AsStringView(); + auto p = new std::string_view(oid); + row->emplace_back(p); + } else { + return GRIN_NULL_ROW; + } return row; } #endif diff --git a/flex/engines/hqps_db/core/operator/scan.h b/flex/engines/hqps_db/core/operator/scan.h index 7caf9e4bb686..5a67c1fc07a4 100644 --- a/flex/engines/hqps_db/core/operator/scan.h +++ b/flex/engines/hqps_db/core/operator/scan.h @@ -96,9 +96,10 @@ class Scan { /// @param v_label_id /// @param oid /// @return + template static vertex_set_t ScanVertexWithOid(const GRAPH_INTERFACE& graph, const label_id_t& v_label_id, - int64_t oid) { + OID_T oid) { std::vector gids; vertex_id_t vid; if (graph.ScanVerticesWithOid(v_label_id, oid, vid)) { @@ -112,10 +113,10 @@ class Scan { /// @param v_label_ids /// @param oid /// @return - template + template static auto ScanVertexWithOid( const GRAPH_INTERFACE& graph, - const std::array& v_label_ids, int64_t oid) { + const std::array& v_label_ids, OID_T oid) { std::vector gids; std::vector labels_vec; std::vector bitsets; diff --git a/flex/engines/hqps_db/core/sync_engine.h b/flex/engines/hqps_db/core/sync_engine.h index 5aaa10f0c856..649830710f94 100644 --- a/flex/engines/hqps_db/core/sync_engine.h +++ b/flex/engines/hqps_db/core/sync_engine.h @@ -161,12 +161,12 @@ class SyncEngine : public BaseEngine { return Context(std::move(v_set_tuple)); } - template ::type* = nullptr, typename COL_T = default_vertex_set_t> static Context ScanVertexWithOid( - const GRAPH_INTERFACE& graph, LabelT v_label, int64_t oid) { + const GRAPH_INTERFACE& graph, LabelT v_label, OID_T oid) { auto v_set_tuple = Scan::ScanVertexWithOid(graph, v_label, oid); @@ -174,26 +174,28 @@ class SyncEngine : public BaseEngine { } template < - AppendOpt append_opt, typename LabelT, + AppendOpt append_opt, typename OID_T, typename LabelT, typename std::enable_if<(append_opt == AppendOpt::Temp)>::type* = nullptr, typename COL_T = default_vertex_set_t> static Context ScanVertexWithOid( - const GRAPH_INTERFACE& graph, LabelT v_label, int64_t oid) { - auto v_set_tuple = - Scan::ScanVertexWithOid(graph, v_label, oid); + const GRAPH_INTERFACE& graph, LabelT v_label, OID_T oid) { + auto v_set_tuple = Scan::template ScanVertexWithOid( + graph, v_label, oid); return Context(std::move(v_set_tuple)); } - template ::type* = nullptr, typename COL_T = GeneralVertexSet> static Context ScanVertexWithOid( const GRAPH_INTERFACE& graph, std::array v_labels, - int64_t oid) { + OID_T oid) { auto v_set_tuple = - Scan::ScanVertexWithOid(graph, v_labels, oid); + Scan::template ScanVertexWithOid( + graph, v_labels, oid); return Context(std::move(v_set_tuple)); } diff --git a/flex/engines/hqps_db/database/mutable_csr_interface.h b/flex/engines/hqps_db/database/mutable_csr_interface.h index 4153f8a00d8d..cb16633ec245 100644 --- a/flex/engines/hqps_db/database/mutable_csr_interface.h +++ b/flex/engines/hqps_db/database/mutable_csr_interface.h @@ -66,7 +66,6 @@ class MutableCSRInterface { const GraphDBSession& GetDBSession() const { return db_session_; } using vertex_id_t = vid_t; - using outer_vertex_id_t = oid_t; using label_id_t = uint8_t; using nbr_list_array_t = mutable_csr_graph_impl::NbrListArray; @@ -182,10 +181,11 @@ class MutableCSRInterface { * @param label * @param oid */ - bool ScanVerticesWithOid(const std::string& label, outer_vertex_id_t oid, - vertex_id_t& vid) const { + template + vertex_id_t ScanVerticesWithOid(const std::string& label, OID_T oid, + vertex_id_t& vid) const { auto label_id = db_session_.schema().get_vertex_label_id(label); - return db_session_.graph().get_lid(label_id, oid, vid); + return db_session_.graph().get_lid(label_id, Any::From(oid), vid); } /** @@ -194,9 +194,10 @@ class MutableCSRInterface { * @param label_id * @param oid */ - bool ScanVerticesWithOid(const label_id_t& label_id, outer_vertex_id_t oid, - vertex_id_t& vid) const { - return db_session_.graph().get_lid(label_id, oid, vid); + template + vertex_id_t ScanVerticesWithOid(const label_id_t& label_id, OID_T oid, + vertex_id_t& vid) const { + return db_session_.graph().get_lid(label_id, Any::From(oid), vid); } /** @@ -237,7 +238,7 @@ class MutableCSRInterface { std::vector> props(oids.size()); for (size_t i = 0; i < oids.size(); ++i) { - db_session_.graph().get_lid(label_id, oids[i], vids[i]); + db_session_.graph().get_lid(label_id, Any::From(oids[i]), vids[i]); get_tuple_from_column_tuple(vids[i], props[i], columns); } diff --git a/flex/resources/queries/ic/adhoc/simple_match_6.cypher b/flex/resources/queries/ic/adhoc/simple_match_6.cypher index 2a40870481c4..4ddea34b628f 100644 --- a/flex/resources/queries/ic/adhoc/simple_match_6.cypher +++ b/flex/resources/queries/ic/adhoc/simple_match_6.cypher @@ -1 +1 @@ -MATCH(a: PERSON) where a.id = 933 return a.firstName AS firstName, a.lastName as lastName; \ No newline at end of file +MATCH(a: PERSON) where a.id = 933L return a.firstName AS firstName, a.lastName as lastName; \ No newline at end of file diff --git a/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.cc index 4da485d490d3..17df08e509f6 100644 --- a/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.cc @@ -80,12 +80,6 @@ void BasicFragmentLoader::AddVertexBatch( } } -void BasicFragmentLoader::FinishAddingVertex( - label_t v_label, const IdIndexer& indexer) { - CHECK(v_label < vertex_label_num_); - build_lf_indexer(indexer, lf_indexers_[v_label]); -} - const LFIndexer& BasicFragmentLoader::GetLFIndexer( label_t v_label) const { CHECK(v_label < vertex_label_num_); diff --git a/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h b/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h index 85fed7cd2c24..823507af5746 100644 --- a/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h +++ b/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h @@ -53,8 +53,12 @@ class BasicFragmentLoader { dst_columns[col_ind]->set_any(vid, prop); } + template void FinishAddingVertex(label_t v_label, - const IdIndexer& indexer); + const IdIndexer& indexer) { + CHECK(v_label < vertex_label_num_); + build_lf_indexer(indexer, lf_indexers_[v_label]); + } template void AddNoPropEdgeBatch(label_t src_label_id, label_t dst_label_id, diff --git a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc index f5f61448d0ee..8b41d9ec2538 100644 --- a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc @@ -274,35 +274,78 @@ static void set_vertex_properties(gs::ColumnBase* col, template static void append_edges( - std::shared_ptr src_col, - std::shared_ptr dst_col, - const LFIndexer& src_indexer, const LFIndexer& dst_indexer, + std::shared_ptr src_col, + std::shared_ptr dst_col, const LFIndexer& src_indexer, + const LFIndexer& dst_indexer, std::vector>& edata_cols, std::vector>& parsed_edges, std::vector& ie_degree, std::vector& oe_degree) { CHECK(src_col->length() == dst_col->length()); + if (src_indexer.get_type() == PropertyType::kInt64) { + CHECK(src_col->type() == arrow::int64()); + } else if (src_indexer.get_type() == PropertyType::kString) { + CHECK(src_col->type() == arrow::utf8() || + src_col->type() == arrow::large_utf8()); + } + if (dst_indexer.get_type() == PropertyType::kInt64) { + CHECK(dst_col->type() == arrow::int64()); + } else if (dst_indexer.get_type() == PropertyType::kString) { + CHECK(dst_col->type() == arrow::utf8() || + dst_col->type() == arrow::large_utf8()); + } auto old_size = parsed_edges.size(); parsed_edges.resize(old_size + src_col->length()); VLOG(10) << "resize parsed_edges from" << old_size << " to " << parsed_edges.size(); - auto src_col_thread = std::thread([&]() { + auto _append = [&](bool is_dst) { size_t cur_ind = old_size; - for (auto i = 0; i < src_col->length(); ++i) { - auto src_vid = src_indexer.get_index(src_col->Value(i)); - std::get<0>(parsed_edges[cur_ind++]) = src_vid; - oe_degree[src_vid]++; - } - }); - auto dst_col_thread = std::thread([&]() { - size_t cur_ind = old_size; - for (auto i = 0; i < dst_col->length(); ++i) { - auto dst_vid = dst_indexer.get_index(dst_col->Value(i)); - std::get<1>(parsed_edges[cur_ind++]) = dst_vid; - ie_degree[dst_vid]++; + const auto& col = is_dst ? dst_col : src_col; + const auto& indexer = is_dst ? dst_indexer : src_indexer; + if (col->type() == arrow::int64()) { + auto casted = std::static_pointer_cast(col); + for (auto j = 0; j < casted->length(); ++j) { + auto vid = indexer.get_index(Any::From(casted->Value(j))); + if (is_dst) { + std::get<1>(parsed_edges[cur_ind++]) = vid; + } else { + std::get<0>(parsed_edges[cur_ind++]) = vid; + } + is_dst ? ie_degree[vid]++ : oe_degree[vid]++; + } + } else if (col->type() == arrow::utf8()) { + auto casted = std::static_pointer_cast(col); + for (auto j = 0; j < casted->length(); ++j) { + auto str = casted->GetView(j); + std::string_view str_view(str.data(), str.size()); + auto vid = indexer.get_index(Any::From(str_view)); + if (is_dst) { + std::get<1>(parsed_edges[cur_ind++]) = vid; + } else { + std::get<0>(parsed_edges[cur_ind++]) = vid; + } + + is_dst ? ie_degree[vid]++ : oe_degree[vid]++; + } + } else if (col->type() == arrow::large_utf8()) { + auto casted = std::static_pointer_cast(col); + for (auto j = 0; j < casted->length(); ++j) { + auto str = casted->GetView(j); + std::string_view str_view(str.data(), str.size()); + auto vid = indexer.get_index(Any::From(str_view)); + if (is_dst) { + std::get<1>(parsed_edges[cur_ind++]) = vid; + } else { + std::get<0>(parsed_edges[cur_ind++]) = vid; + } + is_dst ? ie_degree[vid]++ : oe_degree[vid]++; + } } - }); + }; + + auto src_col_thread = std::thread([&]() { _append(false); }); + auto dst_col_thread = std::thread([&]() { _append(true); }); src_col_thread.join(); dst_col_thread.join(); @@ -345,40 +388,74 @@ static void append_edges( std::vector>& parsed_edges, std::vector& ie_degree, std::vector& oe_degree) { CHECK(src_col->length() == dst_col->length()); - CHECK(src_col->type() == arrow::int64()); - CHECK(dst_col->type() == arrow::int64()); + if (src_indexer.get_type() == PropertyType::kInt64) { + CHECK(src_col->type() == arrow::int64()); + } else if (src_indexer.get_type() == PropertyType::kString) { + CHECK(src_col->type() == arrow::utf8() || + src_col->type() == arrow::large_utf8()); + } + + if (dst_indexer.get_type() == PropertyType::kInt64) { + CHECK(dst_col->type() == arrow::int64()); + } else if (dst_indexer.get_type() == PropertyType::kString) { + CHECK(dst_col->type() == arrow::utf8() || + dst_col->type() == arrow::large_utf8()); + } auto old_size = parsed_edges.size(); parsed_edges.resize(old_size + src_col->length()); VLOG(10) << "resize parsed_edges from" << old_size << " to " << parsed_edges.size(); - - auto src_col_thread = std::thread([&]() { - size_t cur_ind = old_size; - for (auto i = 0; i < src_col->num_chunks(); ++i) { - auto chunk = src_col->chunk(i); - CHECK(chunk->type() == arrow::int64()); - auto casted_chunk = std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_chunk->length(); ++j) { - auto src_vid = src_indexer.get_index(casted_chunk->Value(j)); - std::get<0>(parsed_edges[cur_ind++]) = src_vid; - oe_degree[src_vid]++; - } - } - }); - auto dst_col_thread = std::thread([&]() { + auto _append = [&](bool is_dst) { size_t cur_ind = old_size; - for (auto i = 0; i < dst_col->num_chunks(); ++i) { - auto chunk = dst_col->chunk(i); - CHECK(chunk->type() == arrow::int64()); - auto casted_chunk = std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_chunk->length(); ++j) { - auto dst_vid = dst_indexer.get_index(casted_chunk->Value(j)); - std::get<1>(parsed_edges[cur_ind++]) = dst_vid; - ie_degree[dst_vid]++; + const auto& col = is_dst ? dst_col : src_col; + const auto& indexer = is_dst ? dst_indexer : src_indexer; + for (auto i = 0; i < col->num_chunks(); ++i) { + auto chunk = col->chunk(i); + CHECK(chunk->type() == col->type()); + if (col->type() == arrow::int64()) { + auto casted_chunk = std::static_pointer_cast(chunk); + for (auto j = 0; j < casted_chunk->length(); ++j) { + auto vid = indexer.get_index(Any::From(casted_chunk->Value(j))); + if (is_dst) { + std::get<1>(parsed_edges[cur_ind++]) = vid; + } else { + std::get<0>(parsed_edges[cur_ind++]) = vid; + } + is_dst ? ie_degree[vid]++ : oe_degree[vid]++; + } + } else if (col->type() == arrow::utf8()) { + auto casted_chunk = std::static_pointer_cast(chunk); + for (auto j = 0; j < casted_chunk->length(); ++j) { + auto str = casted_chunk->GetView(j); + std::string_view str_view(str.data(), str.size()); + auto vid = indexer.get_index(Any::From(str_view)); + if (is_dst) { + std::get<1>(parsed_edges[cur_ind++]) = vid; + } else { + std::get<0>(parsed_edges[cur_ind++]) = vid; + } + is_dst ? ie_degree[vid]++ : oe_degree[vid]++; + } + } else if (col->type() == arrow::large_utf8()) { + auto casted_chunk = + std::static_pointer_cast(chunk); + for (auto j = 0; j < casted_chunk->length(); ++j) { + auto str = casted_chunk->GetView(j); + std::string_view str_view(str.data(), str.size()); + auto vid = indexer.get_index(Any::From(str_view)); + if (is_dst) { + std::get<1>(parsed_edges[cur_ind++]) = vid; + } else { + std::get<0>(parsed_edges[cur_ind++]) = vid; + } + is_dst ? ie_degree[vid]++ : oe_degree[vid]++; + } } } - }); + }; + auto src_col_thread = std::thread([&]() { _append(false); }); + auto dst_col_thread = std::thread([&]() { _append(true); }); // if EDATA_T is grape::EmptyType, no need to read columns auto edata_col_thread = std::thread([&]() { @@ -533,32 +610,83 @@ CSVFragmentLoader::createEdgeTableReader(label_t src_label_id, return res.ValueOrDie(); } +template +struct _add_vertex { + void operator()(const std::shared_ptr& col, + IdIndexer& indexer, std::vector& vids) {} +}; + +template <> +struct _add_vertex { + void operator()(const std::shared_ptr& col, + IdIndexer& indexer, + std::vector& vids) { + CHECK(col->type() == arrow::int64()); + size_t row_num = col->length(); + + auto casted_array = std::static_pointer_cast(col); + vid_t vid; + for (auto i = 0; i < row_num; ++i) { + if (!indexer.add(casted_array->Value(i), vid)) { + LOG(FATAL) << "Duplicate vertex id: " << casted_array->Value(i) << ".."; + } + vids.emplace_back(vid); + } + } +}; + +template <> +struct _add_vertex { + void operator()(const std::shared_ptr& col, + IdIndexer& indexer, + std::vector& vids) { + size_t row_num = col->length(); + CHECK(col->type() == arrow::utf8() || col->type() == arrow::large_utf8()); + if (col->type() == arrow::utf8()) { + auto casted_array = std::static_pointer_cast(col); + vid_t vid; + for (auto i = 0; i < row_num; ++i) { + auto str = casted_array->GetView(i); + std::string_view str_view(str.data(), str.size()); + + if (!indexer.add(str_view, vid)) { + LOG(FATAL) << "Duplicate vertex id: " << str_view << ".."; + } + vids.emplace_back(vid); + } + } else { + auto casted_array = + std::static_pointer_cast(col); + vid_t vid; + for (auto i = 0; i < row_num; ++i) { + auto str = casted_array->GetView(i); + std::string_view str_view(str.data(), str.size()); + + if (!indexer.add(str_view, vid)) { + LOG(FATAL) << "Duplicate vertex id: " << str_view << ".."; + } + vids.emplace_back(vid); + } + } + } +}; + +template void CSVFragmentLoader::addVertexBatch( - label_t v_label_id, IdIndexer& indexer, + label_t v_label_id, IdIndexer& indexer, std::shared_ptr& primary_key_col, const std::vector>& property_cols) { size_t row_num = primary_key_col->length(); - CHECK_EQ(primary_key_col->type()->id(), arrow::Type::INT64); auto col_num = property_cols.size(); for (size_t i = 0; i < col_num; ++i) { CHECK_EQ(property_cols[i]->length(), row_num); } - auto casted_array = - std::static_pointer_cast(primary_key_col); - std::vector> prop_vec(property_cols.size()); double t = -grape::GetCurrentTime(); - vid_t vid; std::vector vids; vids.reserve(row_num); - for (auto i = 0; i < row_num; ++i) { - if (!indexer.add(casted_array->Value(i), vid)) { - LOG(FATAL) << "Duplicate vertex id: " << casted_array->Value(i) << " for " - << schema_.get_vertex_label_name(v_label_id); - } - vids.emplace_back(vid); - } + _add_vertex()(primary_key_col, indexer, vids); t += grape::GetCurrentTime(); for (double tmp = convert_to_internal_vertex_time_; !convert_to_internal_vertex_time_.compare_exchange_weak(tmp, tmp + t);) { @@ -580,34 +708,94 @@ void CSVFragmentLoader::addVertexBatch( VLOG(10) << "Insert rows: " << row_num; } +template +struct _add_vertex_chunk { + void operator()(const std::shared_ptr& col, + IdIndexer& indexer, std::vector& vids) {} +}; + +template <> +struct _add_vertex_chunk { + void operator()(const std::shared_ptr& col, + IdIndexer& indexer, + std::vector& vids) { + CHECK(col->type() == arrow::int64()); + size_t row_num = col->length(); + + for (auto i = 0; i < col->num_chunks(); ++i) { + auto chunk = col->chunk(i); + auto casted_array = std::static_pointer_cast(chunk); + for (auto j = 0; j < casted_array->length(); ++j) { + vid_t vid; + if (!indexer.add(casted_array->Value(j), vid)) { + LOG(FATAL) << "Duplicate vertex id: " << casted_array->Value(j) + << " .. "; + } + vids.emplace_back(vid); + } + } + } +}; + +template <> +struct _add_vertex_chunk { + void operator()(const std::shared_ptr& col, + IdIndexer& indexer, + std::vector& vids) { + CHECK(col->type() == arrow::utf8() || col->type() == arrow::large_utf8()); + size_t row_num = col->length(); + + if (col->type() == arrow::utf8()) { + for (auto i = 0; i < col->num_chunks(); ++i) { + auto chunk = col->chunk(i); + auto casted_array = std::static_pointer_cast(chunk); + for (auto j = 0; j < casted_array->length(); ++j) { + vid_t vid; + auto str = casted_array->GetView(j); + std::string_view str_view(str.data(), str.size()); + + if (!indexer.add(str_view, vid)) { + LOG(FATAL) << "Duplicate vertex id: " << str_view << " .. "; + } + vids.emplace_back(vid); + } + } + } else { + for (auto i = 0; i < col->num_chunks(); ++i) { + auto chunk = col->chunk(i); + auto casted_array = + std::static_pointer_cast(chunk); + for (auto j = 0; j < casted_array->length(); ++j) { + vid_t vid; + auto str = casted_array->GetView(j); + std::string_view str_view(str.data(), str.size()); + + if (!indexer.add(str_view, vid)) { + LOG(FATAL) << "Duplicate vertex id: " << str_view << " .. "; + } + vids.emplace_back(vid); + } + } + } + } +}; + +template void CSVFragmentLoader::addVertexBatch( - label_t v_label_id, IdIndexer& indexer, + label_t v_label_id, IdIndexer& indexer, std::shared_ptr& primary_key_col, const std::vector>& property_cols) { size_t row_num = primary_key_col->length(); std::vector vids; vids.reserve(row_num); - CHECK_EQ(primary_key_col->type()->id(), arrow::Type::INT64); - // check row num + // check row num auto col_num = property_cols.size(); for (size_t i = 0; i < col_num; ++i) { CHECK_EQ(property_cols[i]->length(), row_num); } - std::vector> prop_vec(property_cols.size()); double t = -grape::GetCurrentTime(); - for (auto i = 0; i < primary_key_col->num_chunks(); ++i) { - auto chunk = primary_key_col->chunk(i); - auto casted_array = std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_array->length(); ++j) { - vid_t vid; - if (!indexer.add(casted_array->Value(j), vid)) { - LOG(FATAL) << "Duplicate vertex id: " << casted_array->Value(j) - << " for " << schema_.get_vertex_label_name(v_label_id); - } - vids.emplace_back(vid); - } - } + _add_vertex_chunk()(primary_key_col, indexer, vids); t += grape::GetCurrentTime(); for (double tmp = convert_to_internal_vertex_time_; @@ -628,9 +816,10 @@ void CSVFragmentLoader::addVertexBatch( VLOG(10) << "Insert rows: " << row_num; } +template void CSVFragmentLoader::addVerticesImplWithTableReader( const std::string& v_file, label_t v_label_id, - IdIndexer& indexer) { + IdIndexer& indexer) { auto vertex_column_mappings = loading_config_.GetVertexColumnMappings(v_label_id); auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0]; @@ -666,9 +855,10 @@ void CSVFragmentLoader::addVerticesImplWithTableReader( addVertexBatch(v_label_id, indexer, primary_key_column, other_columns_array); } +template void CSVFragmentLoader::addVerticesImplWithStreamReader( const std::string& v_file, label_t v_label_id, - IdIndexer& indexer) { + IdIndexer& indexer) { auto vertex_column_mappings = loading_config_.GetVertexColumnMappings(v_label_id); auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0]; @@ -710,18 +900,19 @@ void CSVFragmentLoader::addVerticesImplWithStreamReader( } } +template void CSVFragmentLoader::addVerticesImpl(label_t v_label_id, const std::string& v_label_name, const std::vector v_files, - IdIndexer& indexer) { + IdIndexer& indexer) { VLOG(10) << "Parsing vertex file:" << v_files.size() << " for label " << v_label_name; for (auto& v_file : v_files) { if (loading_config_.GetIsBatchReader()) { - addVerticesImplWithStreamReader(v_file, v_label_id, indexer); + addVerticesImplWithStreamReader(v_file, v_label_id, indexer); } else { - addVerticesImplWithTableReader(v_file, v_label_id, indexer); + addVerticesImplWithTableReader(v_file, v_label_id, indexer); } } @@ -736,23 +927,36 @@ void CSVFragmentLoader::addVertices(label_t v_label_id, if (primary_keys.size() != 1) { LOG(FATAL) << "Only support one primary key for vertex."; } - if (std::get<0>(primary_keys[0]) != PropertyType::kInt64) { - LOG(FATAL) << "Only support int64_t primary key for vertex."; + auto type = std::get<0>(primary_keys[0]); + if (type != PropertyType::kInt64 && type != PropertyType::kString) { + LOG(FATAL) + << "Only support int64_t and string_view primary key for vertex."; } std::string v_label_name = schema_.get_vertex_label_name(v_label_id); VLOG(10) << "Start init vertices for label " << v_label_name << " with " << v_files.size() << " files."; + if (type == PropertyType::kInt64) { + IdIndexer indexer; + + addVerticesImpl(v_label_id, v_label_name, v_files, indexer); - IdIndexer indexer; + if (indexer.bucket_count() == 0) { + indexer._rehash(schema_.get_max_vnum(v_label_name)); + } + basic_fragment_loader_.FinishAddingVertex(v_label_id, indexer); + } else if (type == PropertyType::kString) { + IdIndexer indexer; - addVerticesImpl(v_label_id, v_label_name, v_files, indexer); + addVerticesImpl(v_label_id, v_label_name, v_files, + indexer); - if (indexer.bucket_count() == 0) { - indexer._rehash(schema_.get_max_vnum(v_label_name)); + if (indexer.bucket_count() == 0) { + indexer._rehash(schema_.get_max_vnum(v_label_name)); + } + basic_fragment_loader_.FinishAddingVertex(v_label_id, + indexer); } - basic_fragment_loader_.FinishAddingVertex(v_label_id, indexer); - VLOG(10) << "Finish init vertices for label " << v_label_name; } @@ -795,9 +999,11 @@ void CSVFragmentLoader::addEdgesImplWithTableReader( CHECK(columns.size() >= 2); auto src_col = columns[0]; auto dst_col = columns[1]; - CHECK(src_col->type() == arrow::int64()) + CHECK(src_col->type() == arrow::int64() || src_col->type() == arrow::utf8() || + src_col->type() == arrow::large_utf8()) << "src_col type: " << src_col->type()->ToString(); - CHECK(dst_col->type() == arrow::int64()) + CHECK(dst_col->type() == arrow::int64() || dst_col->type() == arrow::utf8() || + dst_col->type() == arrow::large_utf8()) << "dst_col type: " << dst_col->type()->ToString(); std::vector> property_cols; @@ -808,8 +1014,6 @@ void CSVFragmentLoader::addEdgesImplWithTableReader( << "Currently only support at most one property on edge"; { CHECK(src_col->length() == dst_col->length()); - CHECK(src_col->type() == arrow::int64()); - CHECK(dst_col->type() == arrow::int64()); t = -grape::GetCurrentTime(); append_edges(src_col, dst_col, src_indexer, dst_indexer, property_cols, parsed_edges, ie_degree, oe_degree); @@ -868,9 +1072,13 @@ void CSVFragmentLoader::addEdgesImplWithStreamReader( CHECK(columns.size() >= 2); auto src_col = columns[0]; auto dst_col = columns[1]; - CHECK(src_col->type() == arrow::int64()) + CHECK(src_col->type() == arrow::int64() || + src_col->type() == arrow::utf8() || + src_col->type() == arrow::large_utf8()) << "src_col type: " << src_col->type()->ToString(); - CHECK(dst_col->type() == arrow::int64()) + CHECK(dst_col->type() == arrow::int64() || + dst_col->type() == arrow::utf8() || + dst_col->type() == arrow::large_utf8()) << "dst_col type: " << dst_col->type()->ToString(); std::vector> property_cols; @@ -882,15 +1090,9 @@ void CSVFragmentLoader::addEdgesImplWithStreamReader( { // add edges to vector CHECK(src_col->length() == dst_col->length()); - CHECK(src_col->type() == arrow::int64()); - CHECK(dst_col->type() == arrow::int64()); - auto src_casted_array = - std::static_pointer_cast(src_col); - auto dst_casted_array = - std::static_pointer_cast(dst_col); t = -grape::GetCurrentTime(); - append_edges(src_casted_array, dst_casted_array, src_indexer, dst_indexer, - property_cols, parsed_edges, ie_degree, oe_degree); + append_edges(src_col, dst_col, src_indexer, dst_indexer, property_cols, + parsed_edges, ie_degree, oe_degree); t += grape::GetCurrentTime(); for (double tmp = convert_to_internal_edge_time_; !convert_to_internal_edge_time_.compare_exchange_weak(tmp, tmp + t); diff --git a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h index 54433e3876ae..842fd929822e 100644 --- a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h +++ b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h @@ -65,25 +65,29 @@ class CSVFragmentLoader : public IFragmentLoader { void addVertices(label_t v_label_id, const std::vector& v_files); + template void addVerticesImpl(label_t v_label_id, const std::string& v_label_name, const std::vector v_file, - IdIndexer& indexer); - + IdIndexer& indexer); + template void addVerticesImplWithStreamReader(const std::string& filename, label_t v_label_id, - IdIndexer& indexer); + IdIndexer& indexer); + template void addVerticesImplWithTableReader(const std::string& filename, label_t v_label_id, - IdIndexer& indexer); + IdIndexer& indexer); + template void addVertexBatch( - label_t v_label_id, IdIndexer& indexer, + label_t v_label_id, IdIndexer& indexer, std::shared_ptr& primary_key_col, const std::vector>& property_cols); + template void addVertexBatch( - label_t v_label_id, IdIndexer& indexer, + label_t v_label_id, IdIndexer& indexer, std::shared_ptr& primary_key_col, const std::vector>& property_cols); diff --git a/flex/storages/rt_mutable_graph/mutable_property_fragment.cc b/flex/storages/rt_mutable_graph/mutable_property_fragment.cc index b4b6142ab7a8..a83d1df28fd7 100644 --- a/flex/storages/rt_mutable_graph/mutable_property_fragment.cc +++ b/flex/storages/rt_mutable_graph/mutable_property_fragment.cc @@ -210,16 +210,16 @@ vid_t MutablePropertyFragment::vertex_num(label_t vertex_label) const { return static_cast(lf_indexers_[vertex_label].size()); } -bool MutablePropertyFragment::get_lid(label_t label, oid_t oid, +bool MutablePropertyFragment::get_lid(label_t label, const Any& oid, vid_t& lid) const { return lf_indexers_[label].get_index(oid, lid); } -oid_t MutablePropertyFragment::get_oid(label_t label, vid_t lid) const { +Any MutablePropertyFragment::get_oid(label_t label, vid_t lid) const { return lf_indexers_[label].get_key(lid); } -vid_t MutablePropertyFragment::add_vertex(label_t label, oid_t id) { +vid_t MutablePropertyFragment::add_vertex(label_t label, const Any& id) { return lf_indexers_[label].insert(id); } diff --git a/flex/storages/rt_mutable_graph/mutable_property_fragment.h b/flex/storages/rt_mutable_graph/mutable_property_fragment.h index 6fd2a7f4ae59..e530e7078909 100644 --- a/flex/storages/rt_mutable_graph/mutable_property_fragment.h +++ b/flex/storages/rt_mutable_graph/mutable_property_fragment.h @@ -55,11 +55,11 @@ class MutablePropertyFragment { vid_t vertex_num(label_t vertex_label) const; - bool get_lid(label_t label, oid_t oid, vid_t& lid) const; + bool get_lid(label_t label, const Any& oid, vid_t& lid) const; - oid_t get_oid(label_t label, vid_t lid) const; + Any get_oid(label_t label, vid_t lid) const; - vid_t add_vertex(label_t label, oid_t id); + vid_t add_vertex(label_t label, const Any& id); std::shared_ptr get_outgoing_edges( label_t label, vid_t u, label_t neighbor_label, label_t edge_label) const; diff --git a/flex/storages/rt_mutable_graph/schema.cc b/flex/storages/rt_mutable_graph/schema.cc index 722d718094aa..d39cdcf0f717 100644 --- a/flex/storages/rt_mutable_graph/schema.cc +++ b/flex/storages/rt_mutable_graph/schema.cc @@ -246,8 +246,9 @@ void Schema::Serialize(std::unique_ptr& writer) { vlabel_indexer_.Serialize(writer); elabel_indexer_.Serialize(writer); grape::InArchive arc; - arc << vproperties_ << vprop_storage_ << eproperties_ << ie_strategy_ - << oe_strategy_ << max_vnum_; + arc << v_primary_keys_ << vproperties_ << vprop_names_ << vprop_storage_ + << eproperties_ << eprop_names_ << ie_strategy_ << oe_strategy_ + << max_vnum_ << plugin_dir_ << plugin_list_; CHECK(writer->WriteArchive(arc)); } @@ -256,8 +257,9 @@ void Schema::Deserialize(std::unique_ptr& reader) { elabel_indexer_.Deserialize(reader); grape::OutArchive arc; CHECK(reader->ReadArchive(arc)); - arc >> vproperties_ >> vprop_storage_ >> eproperties_ >> ie_strategy_ >> - oe_strategy_ >> max_vnum_; + arc >> v_primary_keys_ >> vproperties_ >> vprop_names_ >> vprop_storage_ >> + eproperties_ >> eprop_names_ >> ie_strategy_ >> oe_strategy_ >> + max_vnum_ >> plugin_dir_ >> plugin_list_; } label_t Schema::vertex_label_to_index(const std::string& label) { @@ -561,8 +563,10 @@ static bool parse_vertex_schema(YAML::Node node, Schema& schema) { << " is not found in properties"; return false; } - if (property_types[primary_key_inds[i]] != PropertyType::kInt64) { - LOG(ERROR) << "Primary key " << primary_key_name << " should be int64"; + if (property_types[primary_key_inds[i]] != PropertyType::kInt64 && + property_types[primary_key_inds[i]] != PropertyType::kString) { + LOG(ERROR) << "Primary key " << primary_key_name + << " should be int64 or string"; return false; } primary_keys.emplace_back(property_types[primary_key_inds[i]], diff --git a/flex/storages/rt_mutable_graph/types.h b/flex/storages/rt_mutable_graph/types.h index 2b73eeaf83f4..74079e257edf 100644 --- a/flex/storages/rt_mutable_graph/types.h +++ b/flex/storages/rt_mutable_graph/types.h @@ -28,7 +28,6 @@ enum class EdgeStrategy { using timestamp_t = uint32_t; using vid_t = uint32_t; -using oid_t = int64_t; using label_t = uint8_t; static constexpr const char* DT_SIGNED_INT32 = "DT_SIGNED_INT32"; diff --git a/flex/tests/hqps/match_query.h b/flex/tests/hqps/match_query.h index e41c81a650d6..198393d1431d 100644 --- a/flex/tests/hqps/match_query.h +++ b/flex/tests/hqps/match_query.h @@ -55,7 +55,6 @@ struct Query5expr1 { class MatchQuery : public HqpsAppBase { public: using GRAPH_INTERFACE = gs::MutableCSRInterface; - using oid_t = typename GRAPH_INTERFACE::outer_vertex_id_t; using vertex_id_t = typename GRAPH_INTERFACE::vertex_id_t; public: diff --git a/flex/tests/hqps/sample_query.h b/flex/tests/hqps/sample_query.h index b3536707b8e1..0750d28ff9a4 100644 --- a/flex/tests/hqps/sample_query.h +++ b/flex/tests/hqps/sample_query.h @@ -25,18 +25,17 @@ namespace gs { struct Expression1 { public: using result_t = bool; - Expression1(oid_t oid) : oid_(oid) {} + Expression1(int64_t oid) : oid_(oid) {} - inline bool operator()(oid_t data) const { return oid_ == data; } + inline bool operator()(int64_t data) const { return oid_ == data; } private: - oid_t oid_; + int64_t oid_; }; class SampleQuery : public HqpsAppBase { public: using GRAPH_INTERFACE = gs::MutableCSRInterface; - using oid_t = typename GRAPH_INTERFACE::outer_vertex_id_t; using vertex_id_t = typename GRAPH_INTERFACE::vertex_id_t; public: @@ -59,7 +58,7 @@ class SampleQuery : public HqpsAppBase { using Engine = SyncEngine; auto filter = - gs::make_filter(Expression1(id), gs::PropertySelector("id")); + gs::make_filter(Expression1(id), gs::PropertySelector("id")); auto ctx0 = Engine::template ScanVertex( graph, person_label_id, std::move(filter)); @@ -76,7 +75,7 @@ class SampleQuery : public HqpsAppBase { gs::OrderingPropPair pair0( "creationDate"); // creationDate. - gs::OrderingPropPair pair1( + gs::OrderingPropPair pair1( "id"); auto ctx4 = Engine::Sort(graph, std::move(ctx3), gs::Range(0, 20), std::tuple{pair0, pair1}); @@ -84,13 +83,13 @@ class SampleQuery : public HqpsAppBase { // project // double t3 = -grape::GetCurrentTime(); auto mapper1 = gs::make_mapper_with_variable( - PropertySelector("id")); + PropertySelector("id")); auto mapper2 = gs::make_mapper_with_variable( PropertySelector("firstName")); auto mapper3 = gs::make_mapper_with_variable( PropertySelector("lastName")); auto mapper4 = gs::make_mapper_with_variable( - PropertySelector("id")); + PropertySelector("id")); auto mapper5 = gs::make_mapper_with_variable( PropertySelector("content")); auto mapper6 = gs::make_mapper_with_variable( diff --git a/flex/utils/id_indexer.h b/flex/utils/id_indexer.h index 08c58c693f69..60641d0caefa 100644 --- a/flex/utils/id_indexer.h +++ b/flex/utils/id_indexer.h @@ -26,6 +26,8 @@ limitations under the License. #include "flat_hash_map/flat_hash_map.hpp" #include "flex/utils/mmap_array.h" +#include "flex/utils/property/column.h" +#include "flex/utils/property/types.h" #include "flex/utils/string_view_vector.h" #include "glog/logging.h" #include "grape/io/local_io_adaptor.h" @@ -154,35 +156,68 @@ struct GHash { } }; +template <> +struct GHash { + size_t operator()(const Any& val) const { + if (val.type == PropertyType::kInt64) { + return GHash()(val.AsInt64()); + } else { + return GHash()(val.AsString()); + } + } +}; + template class IdIndexer; template class LFIndexer; -template -void build_lf_indexer(const IdIndexer& input, +template +void build_lf_indexer(const IdIndexer& input, LFIndexer& output, double rate = 0.8); template class LFIndexer { public: - LFIndexer() : num_elements_(0), hasher_() {} - LFIndexer(const LFIndexer& rhs) + LFIndexer() : num_elements_(0), hasher_(), keys_(nullptr) {} + LFIndexer(LFIndexer&& rhs) : keys_(rhs.keys_), indices_(rhs.indices_), num_elements_(rhs.num_elements_.load()), num_slots_minus_one_(rhs.num_slots_minus_one_), hasher_(rhs.hasher_) { + if (keys_ != rhs.keys_) { + if (keys_ != nullptr) { + delete keys_; + } + keys_ = rhs.keys_; + } hash_policy_.set_mod_function_by_index( rhs.hash_policy_.get_mod_function_index()); } + void init(const PropertyType& type) { + if (keys_ != nullptr) { + delete keys_; + } + keys_ = nullptr; + if (type == PropertyType::kInt64) { + keys_ = new TypedColumn(StorageStrategy::kMem); + } else if (type == PropertyType::kString) { + keys_ = new TypedColumn(StorageStrategy::kMem); + } else { + LOG(FATAL) << "Not support type [" << type << "] as pk type .."; + } + } + size_t size() const { return num_elements_.load(); } + PropertyType get_type() const { return keys_->type(); } - INDEX_T insert(int64_t oid) { + INDEX_T insert(const Any& oid) { + assert(oid.type == get_type()); INDEX_T ind = static_cast(num_elements_.fetch_add(1)); - keys_[ind] = oid; + keys_->set_any(ind, oid); size_t index = hash_policy_.index_for_hash(hasher_(oid), num_slots_minus_one_); static constexpr INDEX_T sentinel = std::numeric_limits::max(); @@ -195,15 +230,16 @@ class LFIndexer { return ind; } - INDEX_T get_index(int64_t oid) const { + INDEX_T get_index(const Any& oid) const { + assert(oid.type == get_type()); size_t index = hash_policy_.index_for_hash(hasher_(oid), num_slots_minus_one_); static constexpr INDEX_T sentinel = std::numeric_limits::max(); while (true) { INDEX_T ind = indices_[index]; if (ind == sentinel) { - LOG(FATAL) << "cannot find " << oid << " in id_indexer"; - } else if (keys_[ind] == oid) { + LOG(FATAL) << "cannot find " << oid.to_string() << " in id_indexer"; + } else if (keys_->get(ind) == oid) { return ind; } else { index = (index + 1) % num_slots_minus_one_; @@ -211,7 +247,9 @@ class LFIndexer { } } - bool get_index(int64_t oid, INDEX_T& ret) const { + bool get_index(const Any& oid, INDEX_T& ret) const { + assert(oid.type == get_type()); + size_t index = hash_policy_.index_for_hash(hasher_(oid), num_slots_minus_one_); static constexpr INDEX_T sentinel = std::numeric_limits::max(); @@ -219,7 +257,7 @@ class LFIndexer { INDEX_T ind = indices_[index]; if (ind == sentinel) { return false; - } else if (keys_[ind] == oid) { + } else if (keys_->get(ind) == oid) { ret = ind; return true; } else { @@ -229,12 +267,12 @@ class LFIndexer { return false; } - int64_t get_key(const INDEX_T& index) const { return keys_[index]; } + Any get_key(const INDEX_T& index) const { return keys_->get(index); } void Serialize(const std::string& prefix) { { grape::InArchive arc; - arc << keys_.size() << indices_.size(); + arc << get_type() << keys_->size() << indices_.size(); arc << hash_policy_.get_mod_function_index() << num_elements_.load() << num_slots_minus_one_ << indices_size_; std::string meta_file_path = prefix + ".meta"; @@ -244,8 +282,8 @@ class LFIndexer { fclose(fout); } - if (keys_.size() > 0) { - keys_.dump_to_file(prefix + ".keys"); + if (keys_->size() > 0) { + keys_->Serialize(prefix + ".keys", keys_->size()); } if (indices_.size() > 0) { indices_.dump_to_file(prefix + ".indices"); @@ -265,13 +303,15 @@ class LFIndexer { meta_file_size); grape::OutArchive arc; arc.SetSlice(buf.data(), meta_file_size); - - arc >> keys_size >> indices_size; + PropertyType type; + arc >> type >> keys_size >> indices_size; arc >> mod_function_index >> num_elements >> num_slots_minus_one_ >> indices_size_; + init(type); } - keys_.open_for_read(prefix + ".keys"); - CHECK_EQ(keys_.size(), keys_size); + + keys_->Deserialize(prefix + ".keys"); + CHECK_EQ(keys_->size(), keys_size); indices_.open_for_read(prefix + ".indices"); CHECK_EQ(indices_.size(), indices_size); hash_policy_.set_mod_function_by_index(mod_function_index); @@ -279,24 +319,35 @@ class LFIndexer { } // get keys - const mmap_array& get_keys() const { return keys_; } + const ColumnBase& get_keys() const { return *keys_; } private: - mmap_array keys_; + ColumnBase* keys_; + mmap_array indices_; std::atomic num_elements_; size_t num_slots_minus_one_; size_t indices_size_; ska::ska::prime_number_hash_policy hash_policy_; - GHash hasher_; + GHash hasher_; - template - friend void build_lf_indexer(const IdIndexer& input, + template + friend void build_lf_indexer(const IdIndexer<_KEY_T, _INDEX_T>& input, LFIndexer<_INDEX_T>& output, double rate); }; - +template +class IdIndexerBase { + public: + IdIndexerBase() {} + virtual PropertyType get_type() const = 0; + virtual void _add(const Any& oid) = 0; + virtual bool add(const Any& oid, INDEX_T& lid) = 0; + virtual bool get_key(const INDEX_T& lid, Any& oid) const = 0; + virtual bool get_index(const Any& oid, INDEX_T& lid) const = 0; + virtual size_t size() const = 0; +}; template -class IdIndexer { +class IdIndexer : public IdIndexerBase { public: using key_buffer_t = typename id_indexer_impl::KeyBuffer::type; using ind_buffer_t = std::vector; @@ -305,6 +356,36 @@ class IdIndexer { IdIndexer() : hasher_() { reset_to_empty_state(); } ~IdIndexer() {} + PropertyType get_type() const override { return AnyConverter::type; } + + void _add(const Any& oid) override { + assert(get_type() == oid.type); + KEY_T oid_; + ConvertAny::to(oid, oid_); + _add(oid_); + } + + bool add(const Any& oid, INDEX_T& lid) override { + assert(get_type() == oid.type); + KEY_T oid_; + ConvertAny::to(oid, oid_); + return add(oid_, lid); + } + + bool get_key(const INDEX_T& lid, Any& oid) const override { + KEY_T oid_; + bool flag = get_key(lid, oid_); + oid = Any::From(oid_); + return flag; + } + + bool get_index(const Any& oid, INDEX_T& lid) const override { + assert(get_type() == oid.type); + KEY_T oid_; + ConvertAny::to(oid, oid_); + return get_index(oid_, lid); + } + size_t entry_num() const { return distances_.size(); } bool add(const KEY_T& oid, INDEX_T& lid) { @@ -439,7 +520,7 @@ class IdIndexer { bool empty() const { return (num_elements_ == 0); } - size_t size() const { return num_elements_; } + size_t size() const override { return num_elements_; } bool get_key(INDEX_T lid, KEY_T& oid) const { if (static_cast(lid) >= num_elements_) { @@ -657,20 +738,49 @@ class IdIndexer { ska::ska::prime_number_hash_policy hash_policy_; int8_t max_lookups_ = id_indexer_impl::min_lookups - 1; + size_t num_elements_ = 0; size_t num_slots_minus_one_ = 0; - // std::hash hasher_; GHash hasher_; - template - friend void build_lf_indexer(const IdIndexer& input, + template + friend void build_lf_indexer(const IdIndexer<_KEY_T, _INDEX_T>& input, LFIndexer<_INDEX_T>& output, double rate); }; +template +struct _move_data { + using key_buffer_t = typename id_indexer_impl::KeyBuffer::type; + void operator()(const key_buffer_t& input, ColumnBase& lf, size_t size) {} +}; -template -void build_lf_indexer(const IdIndexer& input, +template +struct _move_data { + using key_buffer_t = typename id_indexer_impl::KeyBuffer::type; + void operator()(const key_buffer_t& input, ColumnBase& col, size_t size) { + // size_t size = input.keys_.size(); + auto& buffer = dynamic_cast&>(col); + memcpy(buffer.buffer().data(), input.data(), sizeof(int64_t) * size); + } +}; + +template +struct _move_data { + using key_buffer_t = + typename id_indexer_impl::KeyBuffer::type; + void operator()(const key_buffer_t& input, ColumnBase& col, size_t size) { + // size_t size = input.keys_.size(); + auto& keys = dynamic_cast&>(col); + for (size_t idx = 0; idx < size; ++idx) { + keys.set_value(idx, input[idx]); + } + } +}; + +template +void build_lf_indexer(const IdIndexer& input, LFIndexer& lf, double rate) { + lf.init(AnyConverter::type); double indices_rate = static_cast(input.keys_.size()) / static_cast(input.indices_.size()); CHECK_LT(indices_rate, rate); @@ -679,8 +789,8 @@ void build_lf_indexer(const IdIndexer& input, size_t lf_size = static_cast(size) / rate + 1; lf_size = std::max(lf_size, static_cast(1024)); - lf.keys_.resize(lf_size); - memcpy(lf.keys_.data(), input.keys_.data(), sizeof(int64_t) * size); + lf.keys_->resize(lf_size); + _move_data()(input.keys_, *lf.keys_, size); lf.num_elements_.store(size); @@ -692,8 +802,9 @@ void build_lf_indexer(const IdIndexer& input, lf.hash_policy_.set_mod_function_by_index( input.hash_policy_.get_mod_function_index()); lf.num_slots_minus_one_ = input.num_slots_minus_one_; - std::vector> res; - for (auto oid : input.keys_) { + std::vector> res; + for (auto idx = 0; idx < input.keys_.size(); ++idx) { + auto oid = input.keys_[idx]; size_t index = input.hash_policy_.index_for_hash( input.hasher_(oid), input.num_slots_minus_one_); for (int8_t distance = 0; input.distances_[index] >= distance; diff --git a/flex/utils/property/column.cc b/flex/utils/property/column.cc index 0c4fac58877b..8bd657f7e901 100644 --- a/flex/utils/property/column.cc +++ b/flex/utils/property/column.cc @@ -38,6 +38,12 @@ class TypedEmptyColumn : public ColumnBase { Any get(size_t index) const override { return Any(); } + size_t size() const override { return 0; } + + void clear() override {} + + void resize(size_t) override {} + void Serialize(const std::string& path, size_t size) override {} void Deserialize(const std::string& path) override {} diff --git a/flex/utils/property/column.h b/flex/utils/property/column.h index b79356e5f576..7fbcd68d819b 100644 --- a/flex/utils/property/column.h +++ b/flex/utils/property/column.h @@ -37,6 +37,12 @@ class ColumnBase { virtual Any get(size_t index) const = 0; + virtual size_t size() const = 0; + + virtual void clear() = 0; + + virtual void resize(size_t size) = 0; + virtual void ingest(uint32_t index, grape::OutArchive& arc) = 0; virtual void Serialize(const std::string& filename, size_t size) = 0; @@ -67,6 +73,7 @@ class TypedColumn : public ColumnBase { Any get(size_t index) const override { return AnyConverter::to_any(buffer_[index]); } + size_t size() const override { return buffer_.size(); } void Serialize(const std::string& path, size_t size) override { buffer_.dump_to_file(path, size); @@ -76,6 +83,10 @@ class TypedColumn : public ColumnBase { buffer_.open_for_read(path); } + void clear() override { buffer_.clear(); } + + void resize(size_t size) override { buffer_.resize(size); } + void ingest(uint32_t index, grape::OutArchive& arc) override { T val; arc >> val; diff --git a/flex/utils/property/types.h b/flex/utils/property/types.h index 56b178297756..a71911bab6ad 100644 --- a/flex/utils/property/types.h +++ b/flex/utils/property/types.h @@ -67,6 +67,13 @@ struct AnyConverter; struct Any { Any() : type(PropertyType::kEmpty) {} + + template + Any(const T& val) { + Any a = Any::From(val); + memcpy(this, &a, sizeof(a)); + } + ~Any() {} int64_t get_long() const { @@ -175,6 +182,29 @@ struct Any { } } + bool operator<(const Any& other) const { + if (type == other.type) { + if (type == PropertyType::kInt32) { + return value.i < other.value.i; + } else if (type == PropertyType::kInt64) { + return value.l < other.value.l; + } else if (type == PropertyType::kDate) { + return value.d.milli_second < other.value.d.milli_second; + } else if (type == PropertyType::kString) { + return value.s < other.value.s; + } else if (type == PropertyType::kEmpty) { + return false; + } else if (type == PropertyType::kDouble) { + return value.db < other.value.db; + } else { + return false; + } + } else { + LOG(FATAL) << "Type [" << static_cast(type) << "] and [" + << static_cast(other.type) << "] cannot be compared.."; + } + } + PropertyType type; AnyValue value; }; diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java index 6470b3813ff1..c585711d4322 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java @@ -67,7 +67,7 @@ public static QueryContext get_simple_match_query_5_test() { public static QueryContext get_simple_match_query_6_test() { String query = - "MATCH(a: PERSON) where a.id = 933 return a.firstName AS firstName, a.lastName as" + "MATCH(a: PERSON) where a.id = 933L return a.firstName AS firstName, a.lastName as" + " lastName;"; List expected = Arrays.asList("Record<{firstName: \"Mahinda\", lastName: \"Perera\"}>");