Skip to content

Commit

Permalink
[Improvement](load) accelerate streamload and compaction (apache#12119)
Browse files Browse the repository at this point in the history
* [Improvement](load) accelerate streamload and compaction
  • Loading branch information
Gabriel39 authored and HappenLee committed Aug 29, 2022
1 parent ca4785b commit c741137
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 49 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ Status ScalarColumnWriter::init() {
_null_bitmap_builder.reset(new NullBitmapBuilder());
}
if (_opts.need_zone_map) {
_zone_map_index_builder.reset(new ZoneMapIndexWriter(get_field()));
RETURN_IF_ERROR(ZoneMapIndexWriter::create(get_field(), _zone_map_index_builder));
}
if (_opts.need_bitmap_index) {
RETURN_IF_ERROR(
Expand Down
84 changes: 68 additions & 16 deletions be/src/olap/rowset/segment_v2/zone_map_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ namespace doris {

namespace segment_v2 {

ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field) : _field(field) {
template <PrimitiveType Type>
TypedZoneMapIndexWriter<Type>::TypedZoneMapIndexWriter(Field* field) : _field(field) {
_page_zone_map.min_value = _field->allocate_zone_map_value(&_pool);
_page_zone_map.max_value = _field->allocate_zone_map_value(&_pool);
_reset_zone_map(&_page_zone_map);
Expand All @@ -38,35 +39,44 @@ ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field) : _field(field) {
_reset_zone_map(&_segment_zone_map);
}

void ZoneMapIndexWriter::add_values(const void* values, size_t count) {
template <PrimitiveType Type>
void TypedZoneMapIndexWriter<Type>::add_values(const void* values, size_t count) {
if (count > 0) {
_page_zone_map.has_not_null = true;
}
const char* vals = reinterpret_cast<const char*>(values);
for (int i = 0; i < count; ++i) {
if (_field->compare(_page_zone_map.min_value, vals) > 0) {
_field->type_info()->direct_copy_may_cut(_page_zone_map.min_value, vals);
}
if (_field->compare(_page_zone_map.max_value, vals) < 0) {
_field->type_info()->direct_copy_may_cut(_page_zone_map.max_value, vals);
}
vals += _field->size();
using ValType =
std::conditional_t<Type == TYPE_DATE, uint24_t,
typename PredicatePrimitiveTypeTraits<Type>::PredicateFieldType>;
const ValType* vals = reinterpret_cast<const ValType*>(values);
auto [min, max] = std::minmax_element(vals, vals + count);
if (unaligned_load<ValType>(min) < unaligned_load<ValType>(_page_zone_map.min_value)) {
_field->type_info()->direct_copy_may_cut(_page_zone_map.min_value,
reinterpret_cast<const void*>(min));
}
if (unaligned_load<ValType>(max) > unaligned_load<ValType>(_page_zone_map.max_value)) {
_field->type_info()->direct_copy_may_cut(_page_zone_map.max_value,
reinterpret_cast<const void*>(max));
}
}

void ZoneMapIndexWriter::moidfy_index_before_flush(struct doris::segment_v2::ZoneMap& zone_map) {
template <PrimitiveType Type>
void TypedZoneMapIndexWriter<Type>::moidfy_index_before_flush(
struct doris::segment_v2::ZoneMap& zone_map) {
_field->modify_zone_map_index(zone_map.max_value);
}

void ZoneMapIndexWriter::reset_page_zone_map() {
template <PrimitiveType Type>
void TypedZoneMapIndexWriter<Type>::reset_page_zone_map() {
_page_zone_map.pass_all = true;
}

void ZoneMapIndexWriter::reset_segment_zone_map() {
template <PrimitiveType Type>
void TypedZoneMapIndexWriter<Type>::reset_segment_zone_map() {
_segment_zone_map.pass_all = true;
}

Status ZoneMapIndexWriter::flush() {
template <PrimitiveType Type>
Status TypedZoneMapIndexWriter<Type>::flush() {
// Update segment zone map.
if (_field->compare(_segment_zone_map.min_value, _page_zone_map.min_value) > 0) {
_field->type_info()->direct_copy(_segment_zone_map.min_value, _page_zone_map.min_value);
Expand Down Expand Up @@ -96,7 +106,9 @@ Status ZoneMapIndexWriter::flush() {
return Status::OK();
}

Status ZoneMapIndexWriter::finish(io::FileWriter* file_writer, ColumnIndexMetaPB* index_meta) {
template <PrimitiveType Type>
Status TypedZoneMapIndexWriter<Type>::finish(io::FileWriter* file_writer,
ColumnIndexMetaPB* index_meta) {
index_meta->set_type(ZONE_MAP_INDEX);
ZoneMapIndexPB* meta = index_meta->mutable_zone_map_index();
// store segment zone map
Expand Down Expand Up @@ -152,5 +164,45 @@ Status ZoneMapIndexReader::load(bool use_page_cache, bool kept_in_memory) {
return Status::OK();
}

#define APPLY_FOR_PRIMITITYPE(M) \
M(TYPE_TINYINT) \
M(TYPE_SMALLINT) \
M(TYPE_INT) \
M(TYPE_BIGINT) \
M(TYPE_LARGEINT) \
M(TYPE_FLOAT) \
M(TYPE_DOUBLE) \
M(TYPE_CHAR) \
M(TYPE_DATE) \
M(TYPE_DATETIME) \
M(TYPE_DATEV2) \
M(TYPE_DATETIMEV2) \
M(TYPE_VARCHAR) \
M(TYPE_STRING) \
M(TYPE_DECIMAL32) \
M(TYPE_DECIMAL64) \
M(TYPE_DECIMAL128)

Status ZoneMapIndexWriter::create(Field* field, std::unique_ptr<ZoneMapIndexWriter>& res) {
switch (field->type()) {
#define M(NAME) \
case OLAP_FIELD_##NAME: { \
res.reset(new TypedZoneMapIndexWriter<NAME>(field)); \
return Status::OK(); \
}
APPLY_FOR_PRIMITITYPE(M)
#undef M
case OLAP_FIELD_TYPE_DECIMAL: {
res.reset(new TypedZoneMapIndexWriter<TYPE_DECIMALV2>(field));
return Status::OK();
}
case OLAP_FIELD_TYPE_BOOL: {
res.reset(new TypedZoneMapIndexWriter<TYPE_BOOLEAN>(field));
return Status::OK();
}
default:
return Status::InvalidArgument("Invalid type!");
}
}
} // namespace segment_v2
} // namespace doris
46 changes: 36 additions & 10 deletions be/src/olap/rowset/segment_v2/zone_map_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,29 +69,55 @@ struct ZoneMap {
}
};

class ZoneMapIndexWriter {
public:
static Status create(Field* field, std::unique_ptr<ZoneMapIndexWriter>& res);

ZoneMapIndexWriter() = default;

virtual ~ZoneMapIndexWriter() = default;

virtual void add_values(const void* values, size_t count) = 0;

virtual void add_nulls(uint32_t count) = 0;

// mark the end of one data page so that we can finalize the corresponding zone map
virtual Status flush() = 0;

virtual Status finish(io::FileWriter* file_writer, ColumnIndexMetaPB* index_meta) = 0;

virtual void moidfy_index_before_flush(ZoneMap& zone_map) = 0;

virtual uint64_t size() const = 0;

virtual void reset_page_zone_map() = 0;
virtual void reset_segment_zone_map() = 0;
};

// Zone map index is represented by an IndexedColumn with ordinal index.
// The IndexedColumn stores serialized ZoneMapPB for each data page.
// It also create and store the segment-level zone map in the index meta so that
// reader can prune an entire segment without reading pages.
class ZoneMapIndexWriter {
template <PrimitiveType Type>
class TypedZoneMapIndexWriter final : public ZoneMapIndexWriter {
public:
explicit ZoneMapIndexWriter(Field* field);
explicit TypedZoneMapIndexWriter(Field* field);

void add_values(const void* values, size_t count);
void add_values(const void* values, size_t count) override;

void add_nulls(uint32_t count) { _page_zone_map.has_null = true; }
void add_nulls(uint32_t count) override { _page_zone_map.has_null = true; }

// mark the end of one data page so that we can finalize the corresponding zone map
Status flush();
Status flush() override;

Status finish(io::FileWriter* file_writer, ColumnIndexMetaPB* index_meta);
Status finish(io::FileWriter* file_writer, ColumnIndexMetaPB* index_meta) override;

void moidfy_index_before_flush(ZoneMap& zone_map);
void moidfy_index_before_flush(ZoneMap& zone_map) override;

uint64_t size() const { return _estimated_size; }
uint64_t size() const override { return _estimated_size; }

void reset_page_zone_map();
void reset_segment_zone_map();
void reset_page_zone_map() override;
void reset_segment_zone_map() override;

private:
void _reset_zone_map(ZoneMap* zone_map) {
Expand Down
47 changes: 25 additions & 22 deletions be/test/olap/rowset/segment_v2/zone_map_index_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,31 @@ class ColumnZoneMapTest : public testing::Test {
std::string filename = kTestDir + "/" + testname;
auto fs = io::global_local_filesystem();

ZoneMapIndexWriter builder(field);
std::unique_ptr<ZoneMapIndexWriter> builder(nullptr);
ZoneMapIndexWriter::create(field, builder);
std::vector<std::string> values1 = {"aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"};
for (auto& value : values1) {
Slice slice(value);
builder.add_values((const uint8_t*)&slice, 1);
builder->add_values((const uint8_t*)&slice, 1);
}
builder.flush();
builder->flush();
std::vector<std::string> values2 = {"aaaaa", "bbbbb", "ccccc", "ddddd", "eeeee", "fffff"};
for (auto& value : values2) {
Slice slice(value);
builder.add_values((const uint8_t*)&slice, 1);
builder->add_values((const uint8_t*)&slice, 1);
}
builder.add_nulls(1);
builder.flush();
builder->add_nulls(1);
builder->flush();
for (int i = 0; i < 6; ++i) {
builder.add_nulls(1);
builder->add_nulls(1);
}
builder.flush();
builder->flush();
// write out zone map index
ColumnIndexMetaPB index_meta;
{
io::FileWriterPtr file_writer;
EXPECT_TRUE(fs->create_file(filename, &file_writer).ok());
EXPECT_TRUE(builder.finish(file_writer.get(), &index_meta).ok());
EXPECT_TRUE(builder->finish(file_writer.get(), &index_meta).ok());
EXPECT_EQ(ZONE_MAP_INDEX, index_meta.type());
EXPECT_TRUE(file_writer->close().ok());
}
Expand Down Expand Up @@ -108,22 +109,23 @@ class ColumnZoneMapTest : public testing::Test {
std::string filename = kTestDir + "/" + testname;
auto fs = io::global_local_filesystem();

ZoneMapIndexWriter builder(field);
std::unique_ptr<ZoneMapIndexWriter> builder(nullptr);
ZoneMapIndexWriter::create(field, builder);
char ch = 'a';
char buf[1024];
for (int i = 0; i < 5; i++) {
memset(buf, ch + i, 1024);
Slice slice(buf, 1024);
builder.add_values((const uint8_t*)&slice, 1);
builder->add_values((const uint8_t*)&slice, 1);
}
builder.flush();
builder->flush();

// write out zone map index
ColumnIndexMetaPB index_meta;
{
io::FileWriterPtr file_writer;
EXPECT_TRUE(fs->create_file(filename, &file_writer).ok());
EXPECT_TRUE(builder.finish(file_writer.get(), &index_meta).ok());
EXPECT_TRUE(builder->finish(file_writer.get(), &index_meta).ok());
EXPECT_EQ(ZONE_MAP_INDEX, index_meta.type());
EXPECT_TRUE(file_writer->close().ok());
}
Expand Down Expand Up @@ -156,26 +158,27 @@ TEST_F(ColumnZoneMapTest, NormalTestIntPage) {
TabletColumn int_column = create_int_key(0);
Field* field = FieldFactory::create(int_column);

ZoneMapIndexWriter builder(field);
std::unique_ptr<ZoneMapIndexWriter> builder(nullptr);
ZoneMapIndexWriter::create(field, builder);
std::vector<int> values1 = {1, 10, 11, 20, 21, 22};
for (auto value : values1) {
builder.add_values((const uint8_t*)&value, 1);
builder->add_values((const uint8_t*)&value, 1);
}
builder.flush();
builder->flush();
std::vector<int> values2 = {2, 12, 31, 23, 21, 22};
for (auto value : values2) {
builder.add_values((const uint8_t*)&value, 1);
builder->add_values((const uint8_t*)&value, 1);
}
builder.add_nulls(1);
builder.flush();
builder.add_nulls(6);
builder.flush();
builder->add_nulls(1);
builder->flush();
builder->add_nulls(6);
builder->flush();
// write out zone map index
ColumnIndexMetaPB index_meta;
{
io::FileWriterPtr file_writer;
EXPECT_TRUE(fs->create_file(filename, &file_writer).ok());
EXPECT_TRUE(builder.finish(file_writer.get(), &index_meta).ok());
EXPECT_TRUE(builder->finish(file_writer.get(), &index_meta).ok());
EXPECT_EQ(ZONE_MAP_INDEX, index_meta.type());
EXPECT_TRUE(file_writer->close().ok());
}
Expand Down

0 comments on commit c741137

Please sign in to comment.