Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix large page read and bitmap index #427

Merged
merged 6 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/cwrapper/tsfile_cwrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static bool is_init = false;

void init_tsfile_config() {
if (!is_init) {
common::init_config_value();
common::init_common();
is_init = true;
}
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/encoding/ts2diff_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class TS2DIFFDecoder : public Decoder {
int64_t read_long(int bits, common::ByteStream &in) {
int64_t value = 0;
while (bits > 0) {
read_byte_if_empty(in);
if (bits > bits_left_ || bits == 8) {
// Take only the bits_left_ "least significant" bits.
uint8_t d = (uint8_t)(buffer_ & ((1 << bits_left_) - 1));
Expand All @@ -93,7 +94,6 @@ class TS2DIFFDecoder : public Decoder {
if (bits <= 0 && current_index_ == 0) {
break;
}
read_byte_if_empty(in);
}
return value;
}
Expand Down Expand Up @@ -129,14 +129,15 @@ inline int32_t TS2DIFFDecoder<int32_t>::decode(common::ByteStream &in) {
ret_value = first_value_;
bits_left_ = 0;
buffer_ = 0;
read_byte_if_empty(in);
current_index_ = 1;
return ret_value;
}
if (current_index_++ >= write_index_) {
current_index_ = 0;
}
stored_value_ = (int32_t)read_long(bit_width_, in);
// although it seems we are reading an int64, bit_width_ guarantees
// that it does not overflow int32
stored_value_ = read_long(bit_width_, in);
ret_value = stored_value_ + first_value_ + delta_min_;
first_value_ = ret_value;

Expand All @@ -151,7 +152,6 @@ inline int64_t TS2DIFFDecoder<int64_t>::decode(common::ByteStream &in) {
common::SerializationUtil::read_i64(delta_min_, in);
common::SerializationUtil::read_i64(first_value_, in);
ret_value = first_value_;
read_byte_if_empty(in);
current_index_ = 1;
return ret_value;
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/encoding/ts2diff_encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ inline int TS2DIFFEncoder<int64_t>::flush(common::ByteStream &out_stream) {
// Calculate the bit length of each value to writer
int bit_width = cal_bit_width(delta_arr_max_ - delta_arr_min_);
// writer header
common::SerializationUtil::write_ui32(write_index_, out_stream);
common::SerializationUtil::write_ui32(bit_width, out_stream);
common::SerializationUtil::write_ui64(delta_arr_min_, out_stream);
common::SerializationUtil::write_ui64(first_value_, out_stream);
common::SerializationUtil::write_i32(write_index_, out_stream);
common::SerializationUtil::write_i32(bit_width, out_stream);
common::SerializationUtil::write_i64(delta_arr_min_, out_stream);
common::SerializationUtil::write_i64(first_value_, out_stream);
// writer data
for (int i = 0; i < write_index_; i++) {
write_bits(delta_arr_[i], bit_width, out_stream);
Expand Down
24 changes: 15 additions & 9 deletions cpp/src/reader/aligned_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta *&chunk_meta,
// @in_stream_
int AlignedChunkReader::read_from_file_and_rewrap(
common::ByteStream &in_stream_, ChunkMeta *&chunk_meta,
uint32_t &chunk_visit_offset, int32_t file_data_buf_size, int want_size) {
uint32_t &chunk_visit_offset, int32_t &file_data_buf_size, int want_size) {
int ret = E_OK;
const int DEFAULT_READ_SIZE = 4096; // may use page_size + page_header_size
char *file_data_buf = in_stream_.get_wrapped_buf();
Expand Down Expand Up @@ -350,8 +350,8 @@ int AlignedChunkReader::decode_cur_time_page_data() {
// << cur_page_header_.compressed_size_ << std::endl;
if (RET_FAIL(read_from_file_and_rewrap(
time_in_stream_, time_chunk_meta_, time_chunk_visit_offset_,
cur_time_page_header_.compressed_size_,
file_data_time_buf_size_))) {
file_data_time_buf_size_,
cur_value_page_header_.compressed_size_))) {
}
}

Expand Down Expand Up @@ -429,8 +429,8 @@ int AlignedChunkReader::decode_cur_value_page_data() {
// << cur_page_header_.compressed_size_ << std::endl;
if (RET_FAIL(read_from_file_and_rewrap(
value_in_stream_, value_chunk_meta_, value_chunk_visit_offset_,
cur_value_page_header_.compressed_size_,
file_data_value_buf_size_))) {
file_data_value_buf_size_,
cur_value_page_header_.compressed_size_))) {
}
}

Expand Down Expand Up @@ -529,19 +529,26 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
int64_t time = 0; \
CppType value; \
while ((time_decoder_->has_remaining() || time_in.has_remaining()) \
&& (value_decoder_->has_remaining() || \
value_in.has_remaining())){ \
&& (value_decoder_->has_remaining() || \
value_in.has_remaining())){ \
cur_value_index++; \
if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & \
0xFF) & \
(mask >> (cur_value_index % 8))) == 0) { \
RET_FAIL(time_decoder_->read_int64(time, time_in)); \
ret = time_decoder_->read_int64(time, time_in); \
if (ret != E_OK) { \
break; \
} \
ret = value_decoder_->read_##ReadType(value, \
value_in); \
if (ret != E_OK) { \
break; \
} \
continue; \
} \
if (UNLIKELY(!row_appender.add_row())) { \
ret = E_OVERFLOW; \
cur_value_index--; \
break; \
} else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) { \
} else if (RET_FAIL(value_decoder_->read_##ReadType(value, \
Expand Down Expand Up @@ -569,7 +576,6 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
while ((time_decoder_->has_remaining() &&
value_decoder_->has_remaining()) ||
(time_in.has_remaining() && value_in.has_remaining())) {
cur_value_index++;
if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) &
(mask >> (cur_value_index % 8))) == 0) {
RET_FAIL(time_decoder_->read_int64(time, time_in));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/reader/aligned_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class AlignedChunkReader : public IChunkReader {
int read_from_file_and_rewrap(common::ByteStream &in_stream_,
ChunkMeta *&chunk_meta,
uint32_t &chunk_visit_offset,
int32_t file_data_buf_size,
int32_t &file_data_buf_size,
int want_size = 0);
bool cur_page_statisify_filter(Filter *filter);
int skip_cur_page();
Expand Down
1 change: 0 additions & 1 deletion cpp/src/reader/block/single_device_tsblock_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task,
int ret = common::E_OK;
pa_.init(512, common::AllocModID::MOD_TSFILE_READER);
tuple_desc_.reset();
common::init_common();
auto table_schema = device_query_task->get_table_schema();
tuple_desc_.push_back(common::g_time_column_schema);
for (const auto& column_name : device_query_task_->get_column_names()) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/reader/qds_without_timegenerator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader,

for (size_t i = 0; i < path_count; i++) {
get_next_tsblock(i, true);
data_types.push_back(value_iters_[i]->get_data_type());
data_types.push_back(value_iters_[i] != nullptr ?
value_iters_[i]->get_data_type() : TSDataType::NULL_TYPE);
}
result_set_metadata_ =
std::make_shared<ResultSetMetadata>(column_names, data_types);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/reader/table_query_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TableQueryExecutor {
tsfile_io_reader_->init(read_file);
meta_data_querier_ = new MetadataQuerier(tsfile_io_reader_);
table_query_ordering_ = TableQueryOrdering::DEVICE;
block_size_ = 1024;
block_size_ = 10240;
}
~TableQueryExecutor() {
if (meta_data_querier_ != nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/writer/tsfile_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ int libtsfile_init() {
}
ModStat::get_instance().init();

init_config_value();
init_common();

g_s_is_inited = true;
return E_OK;
Expand Down
44 changes: 43 additions & 1 deletion cpp/test/encoding/ts2diff_codec_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class TS2DIFFCodecTest : public ::testing::Test {
LongTS2DIFFDecoder* decoder_long_;
};

TEST_F(TS2DIFFCodecTest, TestIntEncoding) {
TEST_F(TS2DIFFCodecTest, TestIntEncoding1) {
common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
const int row_num = 10000;
int32_t data[row_num];
Expand All @@ -79,7 +79,49 @@ TEST_F(TS2DIFFCodecTest, TestIntEncoding) {
}
}

TEST_F(TS2DIFFCodecTest, TestIntEncoding2) {
common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
const int row_num = 10000;
int32_t data[row_num];
memset(data, 0, sizeof(int32_t) * row_num);
for (int i = 0; i < row_num; i++) {
data[i] = i;
}

for (int i = 0; i < row_num; i++) {
EXPECT_EQ(encoder_int_->encode(data[i], out_stream), common::E_OK);
}
EXPECT_EQ(encoder_int_->flush(out_stream), common::E_OK);

int32_t x;
for (int i = 0; i < row_num; i++) {
EXPECT_EQ(decoder_int_->read_int32(x, out_stream), common::E_OK);
EXPECT_EQ(x, data[i]);
}
}

TEST_F(TS2DIFFCodecTest, TestLongEncoding) {
common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
const int row_num = 10000;
int64_t data[row_num];
memset(data, 0, sizeof(int64_t) * row_num);
for (int i = 0; i < row_num; i++) {
data[i] = i;
}

for (int i = 0; i < row_num; i++) {
EXPECT_EQ(encoder_long_->encode(data[i], out_stream), common::E_OK);
}
EXPECT_EQ(encoder_long_->flush(out_stream), common::E_OK);

int64_t x;
for (int i = 0; i < row_num; i++) {
EXPECT_EQ(decoder_long_->read_int64(x, out_stream), common::E_OK);
EXPECT_EQ(x, data[i]);
}
}

TEST_F(TS2DIFFCodecTest, TestLongEncoding2) {
common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
const int row_num = 10000;
int64_t data[row_num];
Expand Down
15 changes: 13 additions & 2 deletions cpp/test/reader/table_view/tsfile_reader_table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class TsFileTableReaderTest : public ::testing::Test {
storage::Tablet tablet(table_schema->get_table_name(),
table_schema->get_measurement_names(),
table_schema->get_data_types(),
table_schema->get_column_categories());
table_schema->get_column_categories(),
device_num * num_timestamp_per_device);

char* literal = new char[std::strlen("device_id") + 1];
std::strcpy(literal, "device_id");
Expand Down Expand Up @@ -199,8 +200,18 @@ class TsFileTableReaderTest : public ::testing::Test {

TEST_F(TsFileTableReaderTest, TableModelQuery) { test_table_model_query(); }

TEST_F(TsFileTableReaderTest, TableModelQueryOnePage) {
TEST_F(TsFileTableReaderTest, TableModelQueryOneSmallPage) {
int prev_config = g_config_value_.page_writer_max_point_num_;
g_config_value_.page_writer_max_point_num_ = 5;
test_table_model_query(g_config_value_.page_writer_max_point_num_);
g_config_value_.page_writer_max_point_num_ = prev_config;
}

TEST_F(TsFileTableReaderTest, TableModelQueryOneLargePage) {
int prev_config = g_config_value_.page_writer_max_point_num_;
g_config_value_.page_writer_max_point_num_ = 10000;
test_table_model_query(g_config_value_.page_writer_max_point_num_);
g_config_value_.page_writer_max_point_num_ = prev_config;
}

TEST_F(TsFileTableReaderTest, TableModelResultMetadata) {
Expand Down