Skip to content

Commit

Permalink
Skip empty value when adding index
Browse files Browse the repository at this point in the history
  • Loading branch information
ljcui committed Nov 20, 2024
1 parent 08f0253 commit 28d4320
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 36 deletions.
20 changes: 13 additions & 7 deletions src/core/kv_table_comparators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ struct KeyCompareFunc {
* The comparator for bytes or strings.
*/
static int LexicalKeyVidCompareFunc(const MDB_val* a, const MDB_val* b) {
int diff;
int len_diff;
unsigned int len;
int diff = 0;
int len_diff = 0;
unsigned int len = 0;

len = static_cast<int>(a->mv_size) - VID_SIZE;
len_diff = static_cast<int>(a->mv_size) - static_cast<int>(b->mv_size);
Expand All @@ -59,12 +59,15 @@ static int LexicalKeyVidCompareFunc(const MDB_val* a, const MDB_val* b) {
}

diff = memcmp(a->mv_data, b->mv_data, len);
if (diff == 0 && len_diff == 0) {
if (diff) {
return diff;
} else if (len_diff) {
return len_diff;
} else {
int64_t a_vid = GetVid((char*)a->mv_data + a->mv_size - VID_SIZE);
int64_t b_vid = GetVid((char*)b->mv_data + b->mv_size - VID_SIZE);
return a_vid < b_vid ? -1 : a_vid > b_vid ? 1 : 0;
}
return static_cast<int>(diff ? diff : len_diff < 0 ? -1 : len_diff);
}

/**
Expand Down Expand Up @@ -114,7 +117,11 @@ static int LexicalKeyEuidCompareFunc(const MDB_val* a, const MDB_val* b) {
}

diff = memcmp(a->mv_data, b->mv_data, len);
if (diff == 0 && len_diff == 0) {
if (diff) {
return diff;
} else if (len_diff) {
return len_diff;
} else {
int64_t a_vid1 = GetVid((char*)a->mv_data + a->mv_size - EUID_SIZE);
int64_t a_vid2 = GetVid((char*)a->mv_data + a->mv_size - EUID_SIZE + VID_SIZE);
int64_t a_lid = GetLabelId((char*)a->mv_data + a->mv_size - EUID_SIZE + LID_BEGIN);
Expand All @@ -133,7 +140,6 @@ static int LexicalKeyEuidCompareFunc(const MDB_val* a, const MDB_val* b) {
: a_eid > b_eid ? 1
: 0;
}
return static_cast<int>(diff ? diff : len_diff < 0 ? -1 : len_diff);
}

template <FieldType DT>
Expand Down
16 changes: 10 additions & 6 deletions src/core/lightning_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2014,25 +2014,29 @@ bool LightningGraph::BlockingAddIndex(const std::string& label, const std::strin
FMA_FMT("start building vertex index for {}:{} in detached model", label, field);
VertexIndex* index = extractor->GetVertexIndex();
uint64_t count = 0;
uint64_t filter = 0;
auto kv_iter = schema->GetPropertyTable().GetIterator(txn.GetTxn());
for (kv_iter->GotoFirstKey(); kv_iter->IsValid(); kv_iter->Next()) {
auto vid = graph::KeyPacker::GetVidFromPropertyTableKey(kv_iter->GetKey());
auto prop = kv_iter->GetValue();
if (extractor->GetIsNull(prop)) {
auto props = kv_iter->GetValue();
auto prop = extractor->GetConstRef(props);
if (prop.Empty()) {
filter++;
continue;
}
if (!index->Add(txn.GetTxn(), extractor->GetConstRef(prop), vid)) {
if (!index->Add(txn.GetTxn(), prop, vid)) {
THROW_CODE(InternalError,
"Failed to index vertex [{}] with field value [{}:{}]",
vid, extractor->Name(), extractor->FieldToString(prop));
vid, extractor->Name(), extractor->FieldToString(props));
}
count++;
if (count % 100000 == 0) {
LOG_DEBUG() << "index count: " << count;
LOG_INFO() << "index count: " << count;
}
}
kv_iter.reset();
LOG_DEBUG() << "index count: " << count;
LOG_INFO() << "index count: " << count;
LOG_INFO() << FMA_FMT("{} records are skipped during adding index.", filter);
txn.Commit();
schema_.Assign(new_schema.release());
LOG_INFO() <<
Expand Down
21 changes: 15 additions & 6 deletions src/core/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ void Schema::DeleteVertexFullTextIndex(VertexId vid, std::vector<FTIndexEntry>&
void Schema::DeleteVertexIndex(KvTransaction& txn, VertexId vid, const Value& record) {
for (auto& idx : indexed_fields_) {
auto& fe = fields_[idx];
if (fe.GetIsNull(record)) continue;
auto prop = fe.GetConstRef(record);
if (prop.Empty()) {
continue;
}
if (fe.Type() != FieldType::FLOAT_VECTOR) {
VertexIndex* index = fe.GetVertexIndex();
FMA_ASSERT(index);
// update field index
if (!index->Delete(txn, fe.GetConstRef(record), vid)) {
if (!index->Delete(txn, prop, vid)) {
THROW_CODE(InputError, "Failed to un-index vertex [{}] with field "
"value [{}:{}]: index value does not exist.",
vid, fe.Name(), fe.FieldToString(record));
Expand Down Expand Up @@ -102,11 +105,14 @@ void Schema::DeleteCreatedVertexIndex(KvTransaction& txn, VertexId vid, const Va
const std::vector<size_t>& created) {
for (auto& idx : created) {
auto& fe = fields_[idx];
if (fe.GetIsNull(record)) continue;
auto prop = fe.GetConstRef(record);
if (prop.Empty()) {
continue;
}
VertexIndex* index = fe.GetVertexIndex();
FMA_ASSERT(index);
// the aim of this method is delete the index that has been created
if (!index->Delete(txn, fe.GetConstRef(record), vid)) {
if (!index->Delete(txn, prop, vid)) {
THROW_CODE(InputError, "Failed to un-index vertex [{}] with field "
"value [{}:{}]: index value does not exist.",
vid, fe.Name(), fe.FieldToString(record));
Expand Down Expand Up @@ -155,12 +161,15 @@ void Schema::AddVertexToIndex(KvTransaction& txn, VertexId vid, const Value& rec
created.reserve(fields_.size());
for (auto& idx : indexed_fields_) {
auto& fe = fields_[idx];
if (fe.GetIsNull(record)) continue;
auto prop = fe.GetConstRef(record);
if (prop.Empty()) {
continue;
}
if (fe.Type() != FieldType::FLOAT_VECTOR) {
VertexIndex* index = fe.GetVertexIndex();
FMA_ASSERT(index);
// update field index
if (!index->Add(txn, fe.GetConstRef(record), vid)) {
if (!index->Add(txn, prop, vid)) {
THROW_CODE(InputError,
"Failed to index vertex [{}] with field value [{}:{}]: index value already exists.",
vid, fe.Name(), fe.FieldToString(record));
Expand Down
29 changes: 12 additions & 17 deletions src/core/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -970,12 +970,10 @@ Transaction::SetVertexProperty(VertexIterator& it, size_t n_fields, const FieldT
fe->ParseAndSet(new_prop, values[i]);
VectorIndex* index = fe->GetVectorIndex();
if (index) {
bool oldnull = fe->GetIsNull(old_prop);
bool newnull = fe->GetIsNull(new_prop);
auto old_v = fe->GetConstRef(old_prop);
auto new_v = fe->GetConstRef(new_prop);
std::vector<int64_t> vids {vid};
if (!oldnull && !newnull) {
const auto& old_v = fe->GetConstRef(old_prop);
const auto& new_v = fe->GetConstRef(new_prop);
if (!old_v.Empty() && !new_v.Empty()) {
if (old_v == new_v) {
continue;
}
Expand All @@ -991,9 +989,8 @@ Transaction::SetVertexProperty(VertexIterator& it, size_t n_fields, const FieldT
floatvector.back().size(), dim);
}
index->Add(floatvector, vids);
} else if (oldnull && !newnull) {
} else if (old_v.Empty() && !new_v.Empty()) {
// add
const auto& new_v = fe->GetConstRef(new_prop);
auto dim = index->GetVecDimension();
std::vector<std::vector<float>> floatvector;
floatvector.emplace_back(new_v.AsFloatVector());
Expand All @@ -1003,7 +1000,7 @@ Transaction::SetVertexProperty(VertexIterator& it, size_t n_fields, const FieldT
floatvector.back().size(), dim);
}
index->Add(floatvector, vids);
} else if (!oldnull && newnull) {
} else if (!old_v.Empty() && new_v.Empty()) {
// delete
index->Remove(vids);
}
Expand All @@ -1013,12 +1010,10 @@ Transaction::SetVertexProperty(VertexIterator& it, size_t n_fields, const FieldT
// update index if there is no error
VertexIndex* index = fe->GetVertexIndex();
if (index && index->IsReady()) {
bool oldnull = fe->GetIsNull(old_prop);
bool newnull = fe->GetIsNull(new_prop);
if (!oldnull && !newnull) {
auto old_v = fe->GetConstRef(old_prop);
auto new_v = fe->GetConstRef(new_prop);
if (!old_v.Empty() && !new_v.Empty()) {
// update
const auto& old_v = fe->GetConstRef(old_prop);
const auto& new_v = fe->GetConstRef(new_prop);
if (old_v == new_v) {
// If the values are equal, there is no need to update the index.
continue;
Expand All @@ -1028,16 +1023,16 @@ Transaction::SetVertexProperty(VertexIterator& it, size_t n_fields, const FieldT
THROW_CODE(InputError,
"failed to update vertex index, {}:[{}] already exists", fe->Name(),
fe->FieldToString(new_prop));
} else if (oldnull && !newnull) {
} else if (old_v.Empty() && !new_v.Empty()) {
// set to non-null, add index
bool r = index->Add(*txn_, fe->GetConstRef(new_prop), vid);
bool r = index->Add(*txn_, new_v, vid);
if (!r)
THROW_CODE(InputError,
"failed to add vertex index, {}:[{}] already exists", fe->Name(),
fe->FieldToString(new_prop));
} else if (!oldnull && newnull) {
} else if (!old_v.Empty() && new_v.Empty()) {
// set to null, delete index
bool r = index->Delete(*txn_, fe->GetConstRef(old_prop), vid);
bool r = index->Delete(*txn_, old_v, vid);
FMA_DBG_ASSERT(r);
} else {
// both null, nothing to do
Expand Down

0 comments on commit 28d4320

Please sign in to comment.