Skip to content

Commit

Permalink
rework RecSplit Bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
yperbasis committed Sep 18, 2024
1 parent c4996f6 commit cbc348f
Showing 1 changed file with 38 additions and 34 deletions.
72 changes: 38 additions & 34 deletions silkworm/db/snapshots/rec_split/rec_split_par.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,35 +89,39 @@ struct RecSplit<LEAF_SIZE>::ParallelBuildingStrategy : public BuildingStrategy {
}

protected:
struct Bucket {
class Bucket {
public:
explicit Bucket(std::size_t bucket_size) {
keys.reserve(bucket_size);
values.reserve(bucket_size);
keys_.reserve(bucket_size);
values_.reserve(bucket_size);
}
Bucket(const Bucket&) = delete;
Bucket(Bucket&&) noexcept = default;

void clear() {
keys_.clear();
values_.clear();
gr_builder_.clear();
index_ofs_.clear();
}

private:
friend class RecSplit;

//! 64-bit fingerprints of keys in the current bucket accumulated before the recsplit is performed for that bucket
std::vector<uint64_t> keys;
std::vector<uint64_t> keys_; // mike: current_bucket_; -> keys_

//! Index offsets for the current bucket
std::vector<uint64_t> values;
std::vector<uint64_t> values_; // mike: current_bucket_offsets_; -> values_

//! Helper to build GR codes of splitting and bijection indices, local to current bucket
encoding::GolombRiceVector::LazyBuilder gr_builder;
encoding::GolombRiceVector::LazyBuilder gr_builder_;

//! The local max index used in Golomb parameter array
uint16_t golomb_param_max_index{0};
uint16_t golomb_param_max_index_{0};

//! Helper index output stream
std::stringstream index_ofs{std::ios::in | std::ios::out | std::ios::binary};

void clear() {
keys.clear();
values.clear();
gr_builder.clear();
index_ofs.clear();
}
std::stringstream index_ofs_{std::ios::in | std::ios::out | std::ios::binary};
};

void setup(const RecSplitSettings& settings, std::size_t bucket_count) override {
Expand Down Expand Up @@ -152,11 +156,11 @@ struct RecSplit<LEAF_SIZE>::ParallelBuildingStrategy : public BuildingStrategy {

auto current_key_count = keys_added_;

bucket.keys.emplace_back(bucket_key);
bucket.values.emplace_back(current_key_count);
bucket.keys_.emplace_back(bucket_key);
bucket.values_.emplace_back(current_key_count);
} else {
bucket.keys.emplace_back(bucket_key);
bucket.values.emplace_back(offset);
bucket.keys_.emplace_back(bucket_key);
bucket.values_.emplace_back(offset);
}

keys_added_++;
Expand Down Expand Up @@ -185,27 +189,27 @@ struct RecSplit<LEAF_SIZE>::ParallelBuildingStrategy : public BuildingStrategy {

bucket_size_accumulator_[0] = bucket_position_accumulator_[0] = 0;
for (size_t i = 0; i < bucket_count_; i++) {
bucket_size_accumulator_[i + 1] = bucket_size_accumulator_[i] + buckets_[i].keys.size();
bucket_size_accumulator_[i + 1] = bucket_size_accumulator_[i] + buckets_[i].keys_.size();

// auto* underlying_buffer = buckets_[i].index_ofs.rdbuf();
// auto* underlying_buffer = buckets_[i].index_ofs_.rdbuf();
// if (!is_empty(underlying_buffer))
// index_output_stream << underlying_buffer;
char byte{0};
while (buckets_[i].index_ofs.get(byte)) { // maybe it is better to avoid this and use a buffer in place of index_ofs
while (buckets_[i].index_ofs_.get(byte)) { // maybe it is better to avoid this and use a buffer in place of index_ofs_
index_output_stream.put(byte);
}
// index_output_stream << buckets_[i].index_ofs.rdbuf(); // better but fails when rdbuf() is empty
// index_output_stream << buckets_[i].index_ofs_.rdbuf(); // better but fails when rdbuf() is empty

if (buckets_[i].keys.size() > 1) {
buckets_[i].gr_builder.append_to(gr_builder_);
if (buckets_[i].keys_.size() > 1) {
buckets_[i].gr_builder_.append_to(gr_builder_);
}

bucket_position_accumulator_[i + 1] = gr_builder_.get_bits();

SILKWORM_ASSERT(bucket_size_accumulator_[i + 1] >= bucket_size_accumulator_[i]);
SILKWORM_ASSERT(bucket_position_accumulator_[i + 1] >= bucket_position_accumulator_[i]);

golomb_param_max_index = std::max(golomb_param_max_index, buckets_[i].golomb_param_max_index);
golomb_param_max_index = std::max(golomb_param_max_index, buckets_[i].golomb_param_max_index_);
}

gr_builder_.append_fixed(1, 1); // Sentinel (avoids checking for parts of size 1)
Expand Down Expand Up @@ -235,25 +239,25 @@ struct RecSplit<LEAF_SIZE>::ParallelBuildingStrategy : public BuildingStrategy {
// It would be better to make this function a member of Bucket
static bool recsplit_bucket(Bucket& bucket, uint8_t bytes_per_record) {
// Sets of size 0 and 1 are not further processed, just write them to index
if (bucket.keys.size() > 1) {
if (contains_duplicate(bucket.keys)) {
if (bucket.keys_.size() > 1) {
if (contains_duplicate(bucket.keys_)) {
SILK_TRACE << "collision detected";
return true;
}

std::vector<uint64_t> buffer_keys; // temporary buffer for keys
std::vector<uint64_t> buffer_offsets; // temporary buffer for offsets
buffer_keys.resize(bucket.keys.size());
buffer_offsets.resize(bucket.values.size());
buffer_keys.resize(bucket.keys_.size());
buffer_offsets.resize(bucket.values_.size());

RecSplit<LEAF_SIZE>::recsplit(
bucket.keys, bucket.values, buffer_keys, buffer_offsets, bucket.gr_builder,
bucket.index_ofs, bucket.golomb_param_max_index, bytes_per_record);
bucket.keys_, bucket.values_, buffer_keys, buffer_offsets, bucket.gr_builder_,
bucket.index_ofs_, bucket.golomb_param_max_index_, bytes_per_record);
} else {
for (const auto offset : bucket.values) {
for (const auto offset : bucket.values_) {
Bytes uint64_buffer(8, '\0');
endian::store_big_u64(uint64_buffer.data(), offset);
bucket.index_ofs.write(reinterpret_cast<const char*>(uint64_buffer.data()), 8);
bucket.index_ofs_.write(reinterpret_cast<const char*>(uint64_buffer.data()), 8);
SILK_TRACE << "[index] written offset: " << offset;
}
}
Expand Down

0 comments on commit cbc348f

Please sign in to comment.