Skip to content

Commit

Permalink
fix(interactive): fix some update interface not work (alibaba#3987)
Browse files Browse the repository at this point in the history
  • Loading branch information
liulx20 authored Jul 1, 2024
1 parent 9bcfc42 commit c4f17a7
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 9 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,18 @@ jobs:
GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/
GLOG_v=10 ./tests/rt_mutable_graph/string_edge_property_test ${SCHEMA_FILE} /tmp/csr-data-dir/

- name: Test update transaction
env:
FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph/
run: |
rm -rf /tmp/csr-data-dir/
cd ${GITHUB_WORKSPACE}/flex/build/
SCHEMA_FILE=../tests/rt_mutable_graph/modern_graph_string_edge.yaml
BULK_LOAD_FILE=../interactive/examples/modern_graph/bulk_load.yaml
GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/
GLOG_v=10 ./tests/rt_mutable_graph/test_update_transaction ${SCHEMA_FILE} /tmp/csr-data-dir/
- name: Test multiple properties edge
env:
FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph/
Expand Down
12 changes: 8 additions & 4 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ GraphDB::~GraphDB() {
compact_thread_running_ = false;
compact_thread_.join();
}
showAppMetrics();
for (int i = 0; i < thread_num_; ++i) {
contexts_[i].~SessionLocalContext();
if (contexts_ != nullptr) {
showAppMetrics();
for (int i = 0; i < thread_num_; ++i) {
contexts_[i].~SessionLocalContext();
}

free(contexts_);
}
free(contexts_);
}

GraphDB& GraphDB::get() {
Expand Down Expand Up @@ -245,6 +248,7 @@ void GraphDB::Close() {
contexts_[i].~SessionLocalContext();
}
free(contexts_);
contexts_ = nullptr;
}
std::fill(app_paths_.begin(), app_paths_.end(), "");
std::fill(app_factories_.begin(), app_factories_.end(), nullptr);
Expand Down
169 changes: 169 additions & 0 deletions flex/tests/rt_mutable_graph/test_update_transaction.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "grape/util.h"

#include "flex/engines/graph_db/database/graph_db.h"

#include <glog/logging.h>

namespace gs {
class TestUpdateTransaction {
public:
TestUpdateTransaction(GraphDB& db)
: db_(db),
src_label_(db.graph().schema().get_vertex_label_id("person")),
dst_label_(db.graph().schema().get_vertex_label_id("software")),
edge_label_(db.graph().schema().get_edge_label_id("created")) {}

void test() {
const std::string name = "unknown";
const int age = 32;
const std::string data = "0.35";
test_set_vertex_field(name, age);
test_set_edge_data(data);
}

void test_set_vertex_field(const std::string& name, int age) {
std::string_view original_name;
int original_age;
{
auto txn = db_.GetUpdateTransaction();
auto it = txn.GetVertexIterator(src_label_);
while (it.GetId().AsInt64() != 1) {
it.Next();
}
original_name = it.GetField(0).AsStringView();
original_age = it.GetField(1).AsInt32();

it.SetField(0, name);
it.SetField(1, age);
txn.Abort();
}
{
auto txn = db_.GetReadTransaction();
int64_t oid = 1;
auto it = txn.FindVertex(src_label_, oid);
CHECK(it.GetField(0).AsStringView() == original_name);
CHECK(it.GetField(1).AsInt32() == original_age);
}

{
auto txn = db_.GetUpdateTransaction();
auto it = txn.GetVertexIterator(src_label_);
while (it.GetId().AsInt64() != 1) {
it.Next();
}
original_name = it.GetField(0).AsStringView();
original_age = it.GetField(1).AsInt32();

it.SetField(0, name);
it.SetField(1, age);
txn.Commit();
}

{
auto txn = db_.GetReadTransaction();
int64_t oid = 1;
auto it = txn.FindVertex(src_label_, oid);
CHECK(it.GetField(0).AsStringView() == name);
CHECK(it.GetField(1).AsInt32() == age);
}
LOG(INFO) << "Finish test set vertex field\n";
}

void test_set_edge_data(const std::string& data) {
std::string_view original_data;
vid_t neighbor;
{
auto txn = db_.GetUpdateTransaction();

auto it = txn.GetOutEdgeIterator(src_label_, 0, dst_label_, edge_label_);
neighbor = it.GetNeighbor();
original_data = it.GetData().AsStringView();
it.SetData(data);
txn.Abort();
}
{
auto txn = db_.GetReadTransaction();
auto es = txn.GetOutgoingEdges<std::string_view>(src_label_, 0,
dst_label_, edge_label_);
for (auto& e : es) {
if (e.get_neighbor() == neighbor) {
CHECK(e.get_data() == original_data);
}
}
}

{
auto txn = db_.GetUpdateTransaction();

auto it = txn.GetOutEdgeIterator(src_label_, 0, dst_label_, edge_label_);
neighbor = it.GetNeighbor();
original_data = it.GetData().AsStringView();
it.SetData(data);
txn.Commit();
}

{
auto txn = db_.GetReadTransaction();
auto es = txn.GetOutgoingEdges<std::string_view>(src_label_, 0,
dst_label_, edge_label_);
for (auto& e : es) {
if (e.get_neighbor() == neighbor) {
CHECK(e.get_data() == data);
}
}
}
LOG(INFO) << "Finish test set edge data\n";
}

private:
GraphDB& db_;
label_t src_label_;
label_t dst_label_;
label_t edge_label_;
};

} // namespace gs
// ./string_edge_property_test graph.yaml data_dir
int main(int argc, char** argv) {
bool warmup = false;
uint32_t shard_num = 1;

std::string graph_schema_path = "";
std::string data_path = "";

graph_schema_path = argv[1];
data_path = argv[2];

double t0 = -grape::GetCurrentTime();
auto& db = gs::GraphDB::get();

auto schema_res = gs::Schema::LoadFromYaml(graph_schema_path);
if (!schema_res.ok()) {
LOG(ERROR) << "Fail to load graph schema file: "
<< schema_res.status().error_message();
return -1;
}
db.Open(schema_res.value(), data_path, shard_num, warmup, true);

t0 += grape::GetCurrentTime();

LOG(INFO) << "Finished loading graph, elapsed " << t0 << " s";
gs::TestUpdateTransaction(db).test();
db.Close();
return 0;
}
36 changes: 31 additions & 5 deletions flex/utils/property/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,13 @@ class TypedColumn : public ColumnBase {
PropertyType type() const override { return AnyConverter<T>::type(); }

void set_value(size_t index, const T& val) {
assert(index >= basic_size_ && index < basic_size_ + extra_size_);
extra_buffer_.set(index - basic_size_, val);
if (index >= basic_size_ && index < basic_size_ + extra_size_) {
extra_buffer_.set(index - basic_size_, val);
} else if (index < basic_size_) {
basic_buffer_.set(index, val);
} else {
LOG(FATAL) << "Index out of range";
}
}

void set_any(size_t index, const Any& value) override {
Expand Down Expand Up @@ -246,8 +251,10 @@ class TypedColumn<std::string_view> : public ColumnBase {
if (std::filesystem::exists(basic_path + ".items")) {
basic_buffer_.open(basic_path, false);
basic_size_ = basic_buffer_.size();
basic_pos_ = basic_buffer_.data_size();
} else {
basic_size_ = 0;
basic_pos_ = 0;
}
if (work_dir == "") {
extra_size_ = 0;
Expand All @@ -262,6 +269,7 @@ class TypedColumn<std::string_view> : public ColumnBase {
void open_in_memory(const std::string& prefix) override {
basic_buffer_.open(prefix, false);
basic_size_ = basic_buffer_.size();
basic_pos_ = basic_buffer_.data_size();

extra_buffer_.reset();
extra_size_ = 0;
Expand All @@ -272,6 +280,7 @@ class TypedColumn<std::string_view> : public ColumnBase {
if (strategy_ == StorageStrategy::kMem || force) {
basic_buffer_.open_with_hugepages(prefix);
basic_size_ = basic_buffer_.size();
basic_pos_ = basic_buffer_.data_size();

extra_buffer_.reset();
extra_buffer_.set_hugepage_prefered(true);
Expand Down Expand Up @@ -300,6 +309,7 @@ class TypedColumn<std::string_view> : public ColumnBase {
}

basic_size_ = 0;
basic_pos_ = 0;
basic_buffer_.reset();
extra_size_ = tmp.size();
extra_buffer_.swap(tmp);
Expand All @@ -324,6 +334,7 @@ class TypedColumn<std::string_view> : public ColumnBase {

extra_size_ = basic_size_ + extra_size_;
basic_size_ = 0;
basic_pos_ = 0;
basic_buffer_.reset();
tmp.open(tmp_path, true);
extra_buffer_.swap(tmp);
Expand All @@ -333,6 +344,7 @@ class TypedColumn<std::string_view> : public ColumnBase {

void dump(const std::string& filename) override {
if (basic_size_ != 0 && extra_size_ == 0) {
basic_buffer_.resize(basic_size_, basic_pos_.load());
basic_buffer_.dump(filename);
} else if (basic_size_ == 0 && extra_size_ != 0) {
extra_buffer_.resize(extra_size_, pos_.load());
Expand Down Expand Up @@ -376,14 +388,26 @@ class TypedColumn<std::string_view> : public ColumnBase {
extra_buffer_.resize(extra_size_, extra_size_ * width_);
}
}
// resize `data` of basic_buffer
{
size_t pos = basic_pos_.load();
pos = pos + (pos + 4) / 5;
basic_buffer_.resize(basic_size_, pos);
}
}

PropertyType type() const override { return PropertyType::Varchar(width_); }

void set_value(size_t idx, const std::string_view& val) {
assert(idx >= basic_size_ && idx < basic_size_ + extra_size_);
size_t offset = pos_.fetch_add(val.size());
extra_buffer_.set(idx - basic_size_, offset, val);
if (idx >= basic_size_ && idx < basic_size_ + extra_size_) {
size_t offset = pos_.fetch_add(val.size());
extra_buffer_.set(idx - basic_size_, offset, val);
} else if (idx < basic_size_) {
size_t offset = basic_pos_.fetch_add(val.size());
basic_buffer_.set(idx, offset, val);
} else {
LOG(FATAL) << "Index out of range";
}
}

void set_any(size_t idx, const Any& value) override {
Expand All @@ -404,6 +428,7 @@ class TypedColumn<std::string_view> : public ColumnBase {
arc >> val;
set_value(index, val);
}

const mmap_array<std::string_view>& basic_buffer() const {
return basic_buffer_;
}
Expand All @@ -424,6 +449,7 @@ class TypedColumn<std::string_view> : public ColumnBase {
mmap_array<std::string_view> extra_buffer_;
size_t extra_size_;
std::atomic<size_t> pos_;
std::atomic<size_t> basic_pos_;
StorageStrategy strategy_;
uint16_t width_;
};
Expand Down

0 comments on commit c4f17a7

Please sign in to comment.