From c4f17a7aa20b69c6d9f6cdf2872504d18021a4a1 Mon Sep 17 00:00:00 2001 From: liulx20 <68941872+liulx20@users.noreply.github.com> Date: Mon, 1 Jul 2024 15:27:39 +0800 Subject: [PATCH] fix(interactive): fix some update interface not work (#3987) --- .github/workflows/interactive.yml | 12 ++ flex/engines/graph_db/database/graph_db.cc | 12 +- .../test_update_transaction.cc | 169 ++++++++++++++++++ flex/utils/property/column.h | 36 +++- 4 files changed, 220 insertions(+), 9 deletions(-) create mode 100644 flex/tests/rt_mutable_graph/test_update_transaction.cc diff --git a/.github/workflows/interactive.yml b/.github/workflows/interactive.yml index b181ed1f40e9..db8266f65c84 100644 --- a/.github/workflows/interactive.yml +++ b/.github/workflows/interactive.yml @@ -468,6 +468,18 @@ jobs: GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/ GLOG_v=10 ./tests/rt_mutable_graph/string_edge_property_test ${SCHEMA_FILE} /tmp/csr-data-dir/ + + - name: Test update transaction + env: + FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph/ + run: | + rm -rf /tmp/csr-data-dir/ + cd ${GITHUB_WORKSPACE}/flex/build/ + SCHEMA_FILE=../tests/rt_mutable_graph/modern_graph_string_edge.yaml + BULK_LOAD_FILE=../interactive/examples/modern_graph/bulk_load.yaml + GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/ + GLOG_v=10 ./tests/rt_mutable_graph/test_update_transaction ${SCHEMA_FILE} /tmp/csr-data-dir/ + - name: Test multiple properties edge env: FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph/ diff --git a/flex/engines/graph_db/database/graph_db.cc b/flex/engines/graph_db/database/graph_db.cc index b34c6908ffe8..e761a83c3551 100644 --- a/flex/engines/graph_db/database/graph_db.cc +++ b/flex/engines/graph_db/database/graph_db.cc @@ -48,11 +48,14 @@ GraphDB::~GraphDB() { compact_thread_running_ = false; compact_thread_.join(); } - showAppMetrics(); - for (int i = 0; i < thread_num_; ++i) { - contexts_[i].~SessionLocalContext(); + if (contexts_ != nullptr) { + showAppMetrics(); + for (int i = 0; i < thread_num_; ++i) { + contexts_[i].~SessionLocalContext(); + } + + free(contexts_); } - free(contexts_); } GraphDB& GraphDB::get() { @@ -245,6 +248,7 @@ void GraphDB::Close() { contexts_[i].~SessionLocalContext(); } free(contexts_); + contexts_ = nullptr; } std::fill(app_paths_.begin(), app_paths_.end(), ""); std::fill(app_factories_.begin(), app_factories_.end(), nullptr); diff --git a/flex/tests/rt_mutable_graph/test_update_transaction.cc b/flex/tests/rt_mutable_graph/test_update_transaction.cc new file mode 100644 index 000000000000..d2a13e6db79f --- /dev/null +++ b/flex/tests/rt_mutable_graph/test_update_transaction.cc @@ -0,0 +1,169 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "grape/util.h" + +#include "flex/engines/graph_db/database/graph_db.h" + +#include + +namespace gs { +class TestUpdateTransaction { + public: + TestUpdateTransaction(GraphDB& db) + : db_(db), + src_label_(db.graph().schema().get_vertex_label_id("person")), + dst_label_(db.graph().schema().get_vertex_label_id("software")), + edge_label_(db.graph().schema().get_edge_label_id("created")) {} + + void test() { + const std::string name = "unknown"; + const int age = 32; + const std::string data = "0.35"; + test_set_vertex_field(name, age); + test_set_edge_data(data); + } + + void test_set_vertex_field(const std::string& name, int age) { + std::string_view original_name; + int original_age; + { + auto txn = db_.GetUpdateTransaction(); + auto it = txn.GetVertexIterator(src_label_); + while (it.GetId().AsInt64() != 1) { + it.Next(); + } + original_name = it.GetField(0).AsStringView(); + original_age = it.GetField(1).AsInt32(); + + it.SetField(0, name); + it.SetField(1, age); + txn.Abort(); + } + { + auto txn = db_.GetReadTransaction(); + int64_t oid = 1; + auto it = txn.FindVertex(src_label_, oid); + CHECK(it.GetField(0).AsStringView() == original_name); + CHECK(it.GetField(1).AsInt32() == original_age); + } + + { + auto txn = db_.GetUpdateTransaction(); + auto it = txn.GetVertexIterator(src_label_); + while (it.GetId().AsInt64() != 1) { + it.Next(); + } + original_name = it.GetField(0).AsStringView(); + original_age = it.GetField(1).AsInt32(); + + it.SetField(0, name); + it.SetField(1, age); + txn.Commit(); + } + + { + auto txn = db_.GetReadTransaction(); + int64_t oid = 1; + auto it = txn.FindVertex(src_label_, oid); + CHECK(it.GetField(0).AsStringView() == name); + CHECK(it.GetField(1).AsInt32() == age); + } + LOG(INFO) << "Finish test set vertex field\n"; + } + + void test_set_edge_data(const std::string& data) { + std::string_view original_data; + vid_t neighbor; + { + auto txn = db_.GetUpdateTransaction(); + + auto it = txn.GetOutEdgeIterator(src_label_, 0, dst_label_, edge_label_); + neighbor = it.GetNeighbor(); + original_data = it.GetData().AsStringView(); + it.SetData(data); + txn.Abort(); + } + { + auto txn = db_.GetReadTransaction(); + auto es = txn.GetOutgoingEdges(src_label_, 0, + dst_label_, edge_label_); + for (auto& e : es) { + if (e.get_neighbor() == neighbor) { + CHECK(e.get_data() == original_data); + } + } + } + + { + auto txn = db_.GetUpdateTransaction(); + + auto it = txn.GetOutEdgeIterator(src_label_, 0, dst_label_, edge_label_); + neighbor = it.GetNeighbor(); + original_data = it.GetData().AsStringView(); + it.SetData(data); + txn.Commit(); + } + + { + auto txn = db_.GetReadTransaction(); + auto es = txn.GetOutgoingEdges(src_label_, 0, + dst_label_, edge_label_); + for (auto& e : es) { + if (e.get_neighbor() == neighbor) { + CHECK(e.get_data() == data); + } + } + } + LOG(INFO) << "Finish test set edge data\n"; + } + + private: + GraphDB& db_; + label_t src_label_; + label_t dst_label_; + label_t edge_label_; +}; + +} // namespace gs +// ./string_edge_property_test graph.yaml data_dir +int main(int argc, char** argv) { + bool warmup = false; + uint32_t shard_num = 1; + + std::string graph_schema_path = ""; + std::string data_path = ""; + + graph_schema_path = argv[1]; + data_path = argv[2]; + + double t0 = -grape::GetCurrentTime(); + auto& db = gs::GraphDB::get(); + + auto schema_res = gs::Schema::LoadFromYaml(graph_schema_path); + if (!schema_res.ok()) { + LOG(ERROR) << "Fail to load graph schema file: " + << schema_res.status().error_message(); + return -1; + } + db.Open(schema_res.value(), data_path, shard_num, warmup, true); + + t0 += grape::GetCurrentTime(); + + LOG(INFO) << "Finished loading graph, elapsed " << t0 << " s"; + gs::TestUpdateTransaction(db).test(); + db.Close(); + return 0; +} diff --git a/flex/utils/property/column.h b/flex/utils/property/column.h index df0d58089cd7..10c58043aad0 100644 --- a/flex/utils/property/column.h +++ b/flex/utils/property/column.h @@ -184,8 +184,13 @@ class TypedColumn : public ColumnBase { PropertyType type() const override { return AnyConverter::type(); } void set_value(size_t index, const T& val) { - assert(index >= basic_size_ && index < basic_size_ + extra_size_); - extra_buffer_.set(index - basic_size_, val); + if (index >= basic_size_ && index < basic_size_ + extra_size_) { + extra_buffer_.set(index - basic_size_, val); + } else if (index < basic_size_) { + basic_buffer_.set(index, val); + } else { + LOG(FATAL) << "Index out of range"; + } } void set_any(size_t index, const Any& value) override { @@ -246,8 +251,10 @@ class TypedColumn : public ColumnBase { if (std::filesystem::exists(basic_path + ".items")) { basic_buffer_.open(basic_path, false); basic_size_ = basic_buffer_.size(); + basic_pos_ = basic_buffer_.data_size(); } else { basic_size_ = 0; + basic_pos_ = 0; } if (work_dir == "") { extra_size_ = 0; @@ -262,6 +269,7 @@ class TypedColumn : public ColumnBase { void open_in_memory(const std::string& prefix) override { basic_buffer_.open(prefix, false); basic_size_ = basic_buffer_.size(); + basic_pos_ = basic_buffer_.data_size(); extra_buffer_.reset(); extra_size_ = 0; @@ -272,6 +280,7 @@ class TypedColumn : public ColumnBase { if (strategy_ == StorageStrategy::kMem || force) { basic_buffer_.open_with_hugepages(prefix); basic_size_ = basic_buffer_.size(); + basic_pos_ = basic_buffer_.data_size(); extra_buffer_.reset(); extra_buffer_.set_hugepage_prefered(true); @@ -300,6 +309,7 @@ class TypedColumn : public ColumnBase { } basic_size_ = 0; + basic_pos_ = 0; basic_buffer_.reset(); extra_size_ = tmp.size(); extra_buffer_.swap(tmp); @@ -324,6 +334,7 @@ class TypedColumn : public ColumnBase { extra_size_ = basic_size_ + extra_size_; basic_size_ = 0; + basic_pos_ = 0; basic_buffer_.reset(); tmp.open(tmp_path, true); extra_buffer_.swap(tmp); @@ -333,6 +344,7 @@ class TypedColumn : public ColumnBase { void dump(const std::string& filename) override { if (basic_size_ != 0 && extra_size_ == 0) { + basic_buffer_.resize(basic_size_, basic_pos_.load()); basic_buffer_.dump(filename); } else if (basic_size_ == 0 && extra_size_ != 0) { extra_buffer_.resize(extra_size_, pos_.load()); @@ -376,14 +388,26 @@ class TypedColumn : public ColumnBase { extra_buffer_.resize(extra_size_, extra_size_ * width_); } } + // resize `data` of basic_buffer + { + size_t pos = basic_pos_.load(); + pos = pos + (pos + 4) / 5; + basic_buffer_.resize(basic_size_, pos); + } } PropertyType type() const override { return PropertyType::Varchar(width_); } void set_value(size_t idx, const std::string_view& val) { - assert(idx >= basic_size_ && idx < basic_size_ + extra_size_); - size_t offset = pos_.fetch_add(val.size()); - extra_buffer_.set(idx - basic_size_, offset, val); + if (idx >= basic_size_ && idx < basic_size_ + extra_size_) { + size_t offset = pos_.fetch_add(val.size()); + extra_buffer_.set(idx - basic_size_, offset, val); + } else if (idx < basic_size_) { + size_t offset = basic_pos_.fetch_add(val.size()); + basic_buffer_.set(idx, offset, val); + } else { + LOG(FATAL) << "Index out of range"; + } } void set_any(size_t idx, const Any& value) override { @@ -404,6 +428,7 @@ class TypedColumn : public ColumnBase { arc >> val; set_value(index, val); } + const mmap_array& basic_buffer() const { return basic_buffer_; } @@ -424,6 +449,7 @@ class TypedColumn : public ColumnBase { mmap_array extra_buffer_; size_t extra_size_; std::atomic pos_; + std::atomic basic_pos_; StorageStrategy strategy_; uint16_t width_; };