Skip to content

Commit

Permalink
upsert with pair unique index
Browse files Browse the repository at this point in the history
  • Loading branch information
ljcui committed Aug 15, 2024
1 parent 3f4b2e9 commit 4fdb984
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 31 deletions.
3 changes: 2 additions & 1 deletion include/lgraph/lgraph_txn.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ class Transaction {
int UpsertEdge(int64_t src, int64_t dst, size_t label_id,
const std::vector<size_t>& unique_pos,
const std::vector<size_t>& field_ids,
const std::vector<FieldData>& field_values);
const std::vector<FieldData>& field_values,
std::optional<size_t> pair_unique_pos);

/**
* @brief List indexes
Expand Down
8 changes: 4 additions & 4 deletions src/core/edge_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ std::unique_ptr<KvIterator> InitEdgeIndexIterator(KvTransaction& txn, KvTable& t
return std::unique_ptr<KvIterator>();
}

Value InitKeyEndValue(const Value& key, IndexType type) {
Value InitKeyEndValue(const Value& key, VertexId vid1, VertexId vid2, IndexType type) {
switch (type) {
case IndexType::GlobalUniqueIndex:
return Value::MakeCopy(key);
case IndexType::PairUniqueIndex:
return _detail::PatchPairUniqueIndexKey(key, -1, -1);
return _detail::PatchPairUniqueIndexKey(key, vid1, vid2);
case IndexType::NonuniqueIndex:
return _detail::PatchNonuniqueIndexKey(key, -1, -1, -1, -1, -1);
}
Expand Down Expand Up @@ -197,7 +197,7 @@ EdgeIndexIterator::EdgeIndexIterator(EdgeIndex* idx, Transaction* txn, KvTable&
index_(idx),
it_(_detail::InitEdgeIndexIterator(
txn->GetTxn(), table, key_start, vid, vid2, lid, tid, eid, type)),
key_end_(_detail::InitKeyEndValue(key_end, type)),
key_end_(_detail::InitKeyEndValue(key_end, vid, vid2, type)),
iv_(),
valid_(false),
pos_(0),
Expand All @@ -216,7 +216,7 @@ EdgeIndexIterator::EdgeIndexIterator(EdgeIndex* idx, KvTransaction* txn, KvTable
index_(idx),
it_(_detail::InitEdgeIndexIterator(
*txn, table, key_start, vid, vid2, lid, tid, eid, type)),
key_end_(_detail::InitKeyEndValue(key_end, type)),
key_end_(_detail::InitKeyEndValue(key_end, vid, vid2, type)),
iv_(),
valid_(false),
pos_(0),
Expand Down
4 changes: 2 additions & 2 deletions src/core/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,8 @@ EdgeIndexIterator Transaction::GetEdgePairUniqueIndexIterator(
size_t label_id, size_t field_id, VertexId src_vid, VertexId dst_vid,
const FieldData& key_start, const FieldData& key_end) {
EdgeIndex* index = GetEdgeIndex(label_id, field_id);
if (!index || !index->IsReady()) {
THROW_CODE(InputError, "EdgeIndex is not created for this field");
if (!index || !index->IsReady() || index->GetType() != IndexType::PairUniqueIndex) {
THROW_CODE(InputError, "Edge pair unique index is not created for this field");
}
Value ks, ke;
if (!key_start.IsNull()) {
Expand Down
85 changes: 72 additions & 13 deletions src/cypher/procedure/procedure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -679,13 +679,17 @@ void BuiltinProcedure::DbUpsertVertexByJson(RTContext *ctx, const Record *record
void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,
const VEC_EXPR &args, const VEC_STR &yield_items,
std::vector<Record> *records) {
CYPHER_ARG_CHECK(args.size() == 4,
"need 4 parameters, "
"e.g. db.upsertEdge(label_name, start_spec, end_spec, list_data)")
CYPHER_ARG_CHECK(args.size() == 4 || args.size() == 5,
"need 4 or 5 parameters, "
"e.g. db.upsertEdge(label_name, start_spec, end_spec, list_data) or "
"db.upsertEdge(label_name, start_spec, end_spec, list_data, pair_unique_field)")
CYPHER_ARG_CHECK(args[0].IsString(), "label_name type should be string")
CYPHER_ARG_CHECK(args[1].IsMap(), "start_spec type should be map")
CYPHER_ARG_CHECK(args[2].IsMap(), "end_spec type should be map")
CYPHER_ARG_CHECK(args[3].IsArray(), "list_data type should be list")
if (args.size() == 5) {
CYPHER_ARG_CHECK(args[4].IsString(), "pair_unique_field type should be string")
}
CYPHER_DB_PROCEDURE_GRAPH_CHECK();
if (ctx->txn_) ctx->txn_->Abort();
const auto& start = *args[1].constant.map;
Expand All @@ -696,6 +700,10 @@ void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,
if (!end.count("type") || !end.count("key")) {
THROW_CODE(InputError, "end_spec missing 'type' or 'key'");
}
std::string pair_unique_field;
if (args.size() == 5) {
pair_unique_field = args[4].constant.AsString();
}
std::string start_type = start.at("type").AsString();
std::string start_json_key = start.at("key").AsString();
std::string end_type = end.at("type").AsString();
Expand Down Expand Up @@ -736,11 +744,20 @@ void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,

auto index_fds = txn.GetTxn()->ListEdgeIndexByLabel(args[0].constant.AsString());
std::unordered_map<size_t, bool> unique_indexs;
bool pair_unique_configured = false;
for (auto& index : index_fds) {
if (index.type == lgraph_api::IndexType::GlobalUniqueIndex) {
unique_indexs[txn.GetEdgeFieldId(label_id, index.field)] = true;
} else if (index.type == lgraph_api::IndexType::PairUniqueIndex) {
if (!pair_unique_field.empty() && index.field == pair_unique_field) {
pair_unique_configured = true;
}
}
}
if (!pair_unique_field.empty() && !pair_unique_configured) {
THROW_CODE(InputError, "No edge pair unique index is configured for this field: {}",
pair_unique_field);
}

const auto& list = *args[3].constant.array;
int64_t json_total = list.size();
Expand All @@ -749,17 +766,24 @@ void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,
int64_t insert = 0;
int64_t update = 0;
std::vector<std::tuple<int64_t , int64_t, std::vector<size_t>, std::vector<size_t>,
std::vector<lgraph_api::FieldData>>> lines;
std::vector<lgraph_api::FieldData>, std::optional<size_t> >> lines;
for (auto& line : list) {
int64_t start_vid = -1;
int64_t end_vid = -1;
std::vector<size_t> unique_pos;
std::optional<size_t> pair_unique_pos;
std::vector<size_t> field_ids;
std::vector<lgraph_api::FieldData> fds;
bool success = true;
if (!line.IsMap()) {
THROW_CODE(InputError, "The type of the elements in the list must be map");
}
if (!pair_unique_field.empty()) {
if (!line.map->count(pair_unique_field)) {
json_error++;
continue;
}
}
for (auto& item : *line.map) {
if (item.first == start_json_key) {
auto fd = item.second.scalar;
Expand Down Expand Up @@ -801,13 +825,17 @@ void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,
field_ids.push_back(iter->second.first);
if (unique_indexs.count(iter->second.first)) {
unique_pos.push_back(field_ids.size() - 1);
} else {
if (!pair_unique_field.empty() && pair_unique_field == item.first) {
pair_unique_pos = field_ids.size() - 1;
}
}
}
}
}
if (success && start_vid >= 0 && end_vid >= 0) {
lines.emplace_back(start_vid, end_vid, std::move(unique_pos),
std::move(field_ids), std::move(fds));
std::move(field_ids), std::move(fds), pair_unique_pos);
} else {
json_error++;
}
Expand All @@ -816,7 +844,7 @@ void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,
txn = db.CreateWriteTxn();
for (auto& l : lines) {
int ret = txn.UpsertEdge(std::get<0>(l), std::get<1>(l),
label_id, std::get<2>(l), std::get<3>(l), std::get<4>(l));
label_id, std::get<2>(l), std::get<3>(l), std::get<4>(l), std::get<5>(l));
if (ret == 0) {
index_conflict++;
} else if (ret == 1) {
Expand All @@ -838,13 +866,17 @@ void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,
void BuiltinProcedure::DbUpsertEdgeByJson(RTContext *ctx, const Record *record,
const VEC_EXPR &args, const VEC_STR &yield_items,
std::vector<Record> *records) {
CYPHER_ARG_CHECK(args.size() == 4,
"need 4 parameters, "
"e.g. db.upsertEdgeByJson(label_name, start_spec, end_spec, list_data)")
CYPHER_ARG_CHECK(args.size() == 4 || args.size() == 5,
"need 4 or 5 parameters, "
"e.g. db.upsertEdgeByJson(label_name, start_spec, end_spec, list_data) or "
"db.upsertEdgeByJson(label_name, start_spec, end_spec, list_data, pair_unique_field)")
CYPHER_ARG_CHECK(args[0].IsString(), "label_name type should be string")
CYPHER_ARG_CHECK(args[1].IsString(), "start_spec type should be json string")
CYPHER_ARG_CHECK(args[2].IsString(), "end_spec type should be json string")
CYPHER_ARG_CHECK(args[3].IsString(), "list_data type should be json string")
if (args.size() == 5) {
CYPHER_ARG_CHECK(args[4].IsString(), "pair_unique_field type should be json string")
}
CYPHER_DB_PROCEDURE_GRAPH_CHECK();
if (ctx->txn_) ctx->txn_->Abort();
nlohmann::json json_data = nlohmann::json::parse(args[3].constant.AsString());
Expand All @@ -860,6 +892,11 @@ void BuiltinProcedure::DbUpsertEdgeByJson(RTContext *ctx, const Record *record,
THROW_CODE(InputError, "end_spec missing 'type' or 'key'");
}

std::string pair_unique_field;
if (args.size() == 5) {
pair_unique_field = args[4].constant.AsString();
}

std::string start_type = start["type"].get<std::string>();
std::string start_json_key = start["key"].get<std::string>();
std::string end_type = end["type"].get<std::string>();
Expand Down Expand Up @@ -900,26 +937,42 @@ void BuiltinProcedure::DbUpsertEdgeByJson(RTContext *ctx, const Record *record,

auto index_fds = txn.GetTxn()->ListEdgeIndexByLabel(args[0].constant.AsString());
std::unordered_map<size_t, bool> unique_indexs;
bool pair_unique_configured = false;
for (auto& index : index_fds) {
if (index.type == lgraph_api::IndexType::GlobalUniqueIndex) {
unique_indexs[txn.GetEdgeFieldId(label_id, index.field)] = true;
} else if (index.type == lgraph_api::IndexType::PairUniqueIndex) {
if (!pair_unique_field.empty() && index.field == pair_unique_field) {
pair_unique_configured = true;
}
}
}
if (!pair_unique_field.empty() && !pair_unique_configured) {
THROW_CODE(InputError, "No edge pair unique index is configured for this field: {}",
pair_unique_field);
}

int64_t json_total = json_data.size();
int64_t json_error = 0;
int64_t index_conflict = 0;
int64_t insert = 0;
int64_t update = 0;
std::vector<std::tuple<int64_t , int64_t, std::vector<size_t>, std::vector<size_t>,
std::vector<lgraph_api::FieldData>>> lines;
std::vector<lgraph_api::FieldData>, std::optional<size_t>>> lines;
for (auto& line : json_data) {
int64_t start_vid = -1;
int64_t end_vid = -1;
std::optional<size_t> pair_unique_pos;
std::vector<size_t> unique_pos;
std::vector<size_t> field_ids;
std::vector<lgraph_api::FieldData> fds;
bool success = true;
if (!pair_unique_field.empty()) {
if (!line.count(pair_unique_field)) {
json_error++;
continue;
}
}
for (auto& item : line.items()) {
if (item.key() == start_json_key) {
auto fd = JsonToFieldData(item.value(), start_pf_fs);
Expand Down Expand Up @@ -961,22 +1014,28 @@ void BuiltinProcedure::DbUpsertEdgeByJson(RTContext *ctx, const Record *record,
field_ids.push_back(iter->second.first);
if (unique_indexs.count(iter->second.first)) {
unique_pos.push_back(field_ids.size() - 1);
} else {
if (!pair_unique_field.empty() && pair_unique_field == item.key()) {
pair_unique_pos = field_ids.size() - 1;
}
}
}
}
}
if (success && start_vid >= 0 && end_vid >= 0) {
lines.emplace_back(start_vid, end_vid, std::move(unique_pos),
std::move(field_ids), std::move(fds));
std::move(field_ids), std::move(fds), pair_unique_pos);
} else {
json_error++;
}
}
txn.Abort();
txn = db.CreateWriteTxn();
for (auto& l : lines) {
int ret = txn.UpsertEdge(std::get<0>(l), std::get<1>(l),
label_id, std::get<2>(l), std::get<3>(l), std::get<4>(l));
int ret = txn.UpsertEdge(
std::get<0>(l), std::get<1>(l),
label_id, std::get<2>(l),
std::get<3>(l), std::get<4>(l), std::get<5>(l));
if (ret == 0) {
index_conflict++;
} else if (ret == 1) {
Expand Down
29 changes: 24 additions & 5 deletions src/lgraph_api/lgraph_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,23 +269,42 @@ bool Transaction::UpsertEdge(int64_t src, int64_t dst, size_t label_id,
int Transaction::UpsertEdge(int64_t src, int64_t dst, size_t label_id,
const std::vector<size_t>& unique_pos,
const std::vector<size_t>& field_ids,
const std::vector<FieldData>& field_values) {
const std::vector<FieldData>& field_values,
std::optional<size_t> pair_unique_pos) {
ThrowIfInvalid();
for (auto pos : unique_pos) {
if (pos >= field_ids.size()) {
THROW_CODE(InputError, "unique_pos is out of the field_ids's range");
}
}
auto iter = txn_->GetOutEdgeIterator(EdgeUid(src, dst, label_id, 0, 0), false);
if (iter.IsValid()) {
std::optional<EdgeUid> euid;
if (pair_unique_pos.has_value()) {
if (pair_unique_pos.value() > field_ids.size()) {
THROW_CODE(InputError, "pair_unique_pos is out of the field_ids's range");
}
auto iter = txn_->GetEdgePairUniqueIndexIterator(
label_id, field_ids[pair_unique_pos.value()],
src, dst,
field_values[pair_unique_pos.value()],
field_values[pair_unique_pos.value()]);
if (iter.IsValid()) {
euid = iter.GetUid();
}
} else {
auto iter = txn_->GetOutEdgeIterator(EdgeUid(src, dst, label_id, 0, 0), false);
if (iter.IsValid()) {
euid = iter.GetUid();
}
}
if (euid.has_value()) {
for (auto pos : unique_pos) {
auto tmp = txn_->GetEdgeIndexIterator(
label_id, field_ids[pos], field_values[pos], field_values[pos]);
if (tmp.IsValid() && (tmp.GetUid() != iter.GetUid())) {
if (tmp.IsValid() && (tmp.GetUid() != euid.value())) {
return 0;
}
}
txn_->SetEdgeProperty(iter, field_ids, field_values);
txn_->SetEdgeProperty(euid.value(), field_ids, field_values);
return 2;
} else {
for (auto pos : unique_pos) {
Expand Down
21 changes: 15 additions & 6 deletions test/test_lgraph_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1243,11 +1243,20 @@ TEST_F(TestLGraphApi, pairUniqueIndex) {
txn = db.CreateWriteTxn();
auto like_lid = txn.GetEdgeLabelId("like");
auto fid = txn.GetEdgeFieldId(like_lid, "id");
auto iter = txn.GetEdgePairUniqueIndexIterator(like_lid, fid, vids[i], vids[i+1], FieldData::Int32(i), FieldData::Int32(i));
UT_EXPECT_TRUE(iter.IsValid());
auto euid = iter.GetUid();
auto iter2 = txn.GetOutEdgeIterator(euid);
UT_EXPECT_EQ(iter2.GetField("id"), FieldData::Int32(i));
{
auto iter = txn.GetEdgePairUniqueIndexIterator(
like_lid, fid, vids[i], vids[i + 1], FieldData::Int32(i), FieldData::Int32(i));
UT_EXPECT_TRUE(iter.IsValid());
auto euid = iter.GetUid();
auto iter2 = txn.GetOutEdgeIterator(euid);
UT_EXPECT_EQ(iter2.GetField("id"), FieldData::Int32(i));
}
{
auto iter = txn.GetEdgePairUniqueIndexIterator(like_lid, fid, vids[i], vids[i + 1],
FieldData::Int32(i + 1),
FieldData::Int32(i + 1));
UT_EXPECT_FALSE(iter.IsValid());
}
txn.Abort();
}
}
}
Loading

0 comments on commit 4fdb984

Please sign in to comment.