Skip to content

Commit

Permalink
optimize full-text index creation time (#1199)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?
* optimize term tuple comparison for external sorting, reducing
full-text index creation time by 10%

### Type of change
- [x] Refactoring
- [x] Performance Improvement
  • Loading branch information
Ma-cat authored May 11, 2024
1 parent 01e0435 commit b0daaf1
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 10 deletions.
6 changes: 4 additions & 2 deletions src/storage/invertedindex/column_inverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ SizeT ColumnInverter::InvertColumn(SharedPtr<ColumnVector> column_vector, u32 ro
SizeT term_count_sum = 0;
for (SizeT i = 0; i < row_count; ++i) {
String data = column_vector->ToString(row_offset + i);
if (data.empty())
if (data.empty()) {
continue;
}
SizeT term_count = InvertColumn(begin_doc_id + i, data);
column_lengths[i] = term_count;
term_count_sum += term_count;
Expand Down Expand Up @@ -246,8 +247,9 @@ void ColumnInverter::SortForOfflineDump() {
// Data within each group
void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count) {
// spill sort results for external merge sort
if (positions_.empty())
if (positions_.empty()) {
return;
}
// size of this Run in bytes
u32 data_size = 0;
u64 data_size_pos = ftell(spill_file);
Expand Down
38 changes: 38 additions & 0 deletions src/storage/invertedindex/common/external_sort_merger.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,44 @@ struct KeyAddress<KeyType, LenType, typename std::enable_if<std::is_scalar<KeyTy
bool operator<(const KeyAddress &other) const { return Compare(other) > 0; }
};

template <typename LenType>
struct KeyAddress<TermTuple, LenType> {
char *data{nullptr};
u64 addr;
u32 idx;

KeyAddress(char *p, u64 ad, u32 i) {
data = p;
addr = ad;
idx = i;
}

KeyAddress() {
data = nullptr;
addr = -1;
idx = -1;
}

TermTuple KEY() { return TermTuple(data + sizeof(LenType), LEN()); }
TermTuple KEY() const { return TermTuple(data + sizeof(LenType), LEN()); }
LenType LEN() const { return *(LenType *)data; }
u64 &ADDR() { return addr; }
u64 ADDR() const { return addr; }
u32 IDX() const { return idx; }
u32 &IDX() { return idx; }

int Compare(const KeyAddress &p) const {
return KEY().Compare(p.KEY());
}

bool operator==(const KeyAddress &other) const { return Compare(other) == 0; }

bool operator>(const KeyAddress &other) const { return Compare(other) < 0; }

bool operator<(const KeyAddress &other) const { return Compare(other) > 0; }
};


export template <typename KeyType, typename LenType>
class SortMerger {
typedef SortMerger<KeyType, LenType> self_t;
Expand Down
24 changes: 16 additions & 8 deletions src/storage/invertedindex/memory_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ MemoryIndexer::~MemoryIndexer() {
}

void MemoryIndexer::Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count, bool offline) {
if (is_spilled_)
if (is_spilled_) {
Load();
}

u64 seq_inserted(0);
u32 doc_count(0);
Expand Down Expand Up @@ -121,8 +122,9 @@ void MemoryIndexer::Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset
auto func = [this, task, inverter](int id) {
SizeT column_length_sum = inverter->InvertColumn(task->column_vector_, task->row_offset_, task->row_count_, task->start_doc_id_);
column_length_sum_ += column_length_sum;
if (column_length_sum > 0)
if (column_length_sum > 0) {
inverter->SortForOfflineDump();
}
this->ring_sorted_.Put(task->task_seq_, inverter);
};
inverting_thread_pool_.push(std::move(func));
Expand All @@ -145,8 +147,9 @@ void MemoryIndexer::Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset
}

void MemoryIndexer::InsertGap(u32 row_count) {
if (is_spilled_)
if (is_spilled_) {
Load();
}

std::unique_lock<std::mutex> lock(mutex_);
doc_count_ += row_count;
Expand All @@ -155,14 +158,16 @@ void MemoryIndexer::InsertGap(u32 row_count) {
void MemoryIndexer::Commit(bool offline) {
if (offline) {
commiting_thread_pool_.push([this](int id) { this->CommitOffline(); });
} else
} else {
commiting_thread_pool_.push([this](int id) { this->CommitSync(); });
}
}

SizeT MemoryIndexer::CommitOffline(SizeT wait_if_empty_ms) {
std::unique_lock<std::mutex> lock(mutex_commit_, std::defer_lock);
if (!lock.try_lock())
if (!lock.try_lock()) {
return 0;
}

if (nullptr == spill_file_handle_) {
PrepareSpillFile();
Expand Down Expand Up @@ -200,14 +205,16 @@ SizeT MemoryIndexer::CommitSync(SizeT wait_if_empty_ms) {
};

std::unique_lock<std::mutex> lock(mutex_commit_, std::defer_lock);
if (!lock.try_lock())
if (!lock.try_lock()) {
return 0;
}

while (1) {
this->ring_sorted_.GetBatch(inverters, wait_if_empty_ms);
// num_merged = inverters.size();
if (inverters.empty())
if (inverters.empty()) {
break;
}
for (auto &inverter : inverters) {
inverter->GeneratePosting();
num_generated += inverter->GetMerged();
Expand Down Expand Up @@ -353,8 +360,9 @@ void MemoryIndexer::OfflineDump() {
// 2. Generate posting
// 3. Dump disk segment data
// LOG_INFO(fmt::format("MemoryIndexer::OfflineDump begin, num_runs_ {}", num_runs_));
if (tuple_count_ == 0)
if (tuple_count_ == 0) {
return;
}
FinalSpillFile();
constexpr u32 buffer_size_of_each_run = 2 * 1024 * 1024;
SortMerger<TermTuple, u32> *merger = new SortMerger<TermTuple, u32>(spill_full_path_.c_str(), num_runs_, buffer_size_of_each_run * num_runs_, 2);
Expand Down

0 comments on commit b0daaf1

Please sign in to comment.