From 7ccf5a61ef2e37b9f0df1bd336d48a03b834da45 Mon Sep 17 00:00:00 2001 From: jacktengg Date: Wed, 4 Dec 2024 16:36:10 +0800 Subject: [PATCH] [env](compile) open compile check for table writers --- .../exec/format/table/iceberg/struct_like.h | 5 +-- be/src/vec/sink/vrow_distribution.cpp | 16 ++++++-- be/src/vec/sink/vrow_distribution.h | 2 +- .../vec/sink/writer/async_result_writer.cpp | 3 +- be/src/vec/sink/writer/async_result_writer.h | 2 +- .../vec/sink/writer/iceberg/partition_data.h | 15 ++------ .../writer/iceberg/partition_transformers.h | 38 +++++++++++++------ .../writer/iceberg/viceberg_table_writer.cpp | 1 + be/src/vec/sink/writer/vhive_table_writer.cpp | 1 + .../vec/sink/writer/vmysql_table_writer.cpp | 8 ++-- be/src/vec/sink/writer/vodbc_table_writer.cpp | 1 + be/src/vec/sink/writer/vtablet_writer.cpp | 11 +++--- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 3 +- 13 files changed, 64 insertions(+), 42 deletions(-) diff --git a/be/src/vec/exec/format/table/iceberg/struct_like.h b/be/src/vec/exec/format/table/iceberg/struct_like.h index 23d394066fc639..cf02e2d1eb8a25 100644 --- a/be/src/vec/exec/format/table/iceberg/struct_like.h +++ b/be/src/vec/exec/format/table/iceberg/struct_like.h @@ -26,11 +26,8 @@ namespace iceberg { class StructLike { public: virtual ~StructLike() = default; - virtual int size() const = 0; - virtual std::any get(int pos) const = 0; - - virtual void set(int pos, const std::any& value) = 0; + virtual std::any get(size_t pos) const = 0; }; } // namespace iceberg diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index f374064c0af767..4a9790a4437272 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -25,6 +25,7 @@ #include #include +#include "common/cast_set.h" #include "common/logging.h" #include "common/status.h" #include "runtime/client_cache.h" @@ -225,7 +226,10 @@ void VRowDistribution::_filter_block_by_skip(vectorized::Block* block, auto& partition_ids = row_part_tablet_id.partition_ids; auto& tablet_ids = row_part_tablet_id.tablet_ids; - for (size_t i = 0; i < block->rows(); i++) { + auto rows = block->rows(); + // row count of a block should not exceed UINT32_MAX + auto rows_uint32 = cast_set(rows); + for (uint32_t i = 0; i < rows_uint32; i++) { if (!_skip[i]) { row_ids.emplace_back(i); partition_ids.emplace_back(_partitions[i]->id); @@ -250,7 +254,10 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( auto& tablet_ids = row_part_tablet_id.tablet_ids; if (const auto* nullable_column = vectorized::check_and_get_column(*filter_column)) { - for (size_t i = 0; i < block->rows(); i++) { + auto rows = block->rows(); + // row count of a block should not exceed UINT32_MAX + auto rows_uint32 = cast_set(rows); + for (uint32_t i = 0; i < rows_uint32; i++) { if (nullable_column->get_bool_inline(i) && !_skip[i]) { row_ids.emplace_back(i); partition_ids.emplace_back(_partitions[i]->id); @@ -267,7 +274,10 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( _filter_block_by_skip(block, row_part_tablet_id); } else { const auto& filter = assert_cast(*filter_column).get_data(); - for (size_t i = 0; i < block->rows(); i++) { + auto rows = block->rows(); + // row count of a block should not exceed UINT32_MAX + auto rows_uint32 = cast_set(rows); + for (uint32_t i = 0; i < rows_uint32; i++) { if (filter[i] != 0 && !_skip[i]) { row_ids.emplace_back(i); partition_ids.emplace_back(_partitions[i]->id); diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 40202556290ea8..6248a28dba5821 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -48,7 +48,7 @@ class VNodeChannel; // class RowPartTabletIds { public: - std::vector row_ids; + std::vector row_ids; std::vector partition_ids; std::vector tablet_ids; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index c17b84b2dbe1de..65210a53ec3b55 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -32,6 +32,7 @@ class RowDescriptor; class TExpr; namespace vectorized { +#include "common/compile_check_begin.h" AsyncResultWriter::AsyncResultWriter(const doris::vectorized::VExprContextSPtrs& output_expr_ctxs, std::shared_ptr dep, @@ -225,7 +226,7 @@ void AsyncResultWriter::_return_free_block(std::unique_ptr b) { } std::unique_ptr AsyncResultWriter::_get_free_block(doris::vectorized::Block* block, - int rows) { + size_t rows) { std::unique_ptr b; if (!_free_blocks.try_dequeue(b)) { b = block->create_same_struct_block(rows, true); diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index 36bca48358a8ab..513f2aa7984ca5 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -76,7 +76,7 @@ class AsyncResultWriter : public ResultWriter { Status _projection_block(Block& input_block, Block* output_block); const VExprContextSPtrs& _vec_output_expr_ctxs; - std::unique_ptr _get_free_block(Block*, int rows); + std::unique_ptr _get_free_block(Block*, size_t rows); void _return_free_block(std::unique_ptr); diff --git a/be/src/vec/sink/writer/iceberg/partition_data.h b/be/src/vec/sink/writer/iceberg/partition_data.h index 512dbd47904108..d3dfb1e8eccb87 100644 --- a/be/src/vec/sink/writer/iceberg/partition_data.h +++ b/be/src/vec/sink/writer/iceberg/partition_data.h @@ -21,31 +21,24 @@ namespace doris { namespace vectorized { +#include "common/compile_check_begin.h" class PartitionData : public iceberg::StructLike { public: explicit PartitionData(std::vector partition_values) : _partition_values(std::move(partition_values)) {} - int size() const override { return _partition_values.size(); } - - std::any get(int pos) const override { - if (pos < 0 || pos >= _partition_values.size()) { + std::any get(size_t pos) const override { + if (pos >= _partition_values.size()) { throw std::out_of_range("Index out of range"); } return _partition_values[pos]; } - void set(int pos, const std::any& value) override { - if (pos < 0 || pos >= _partition_values.size()) { - throw std::out_of_range("Index out of range"); - } - _partition_values[pos] = value; - } - private: std::vector _partition_values; }; } // namespace vectorized } // namespace doris +#include "common/compile_check_end.h" diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.h b/be/src/vec/sink/writer/iceberg/partition_transformers.h index 84ee3029cdd18f..79eb385b298a8f 100644 --- a/be/src/vec/sink/writer/iceberg/partition_transformers.h +++ b/be/src/vec/sink/writer/iceberg/partition_transformers.h @@ -30,6 +30,7 @@ class PartitionField; }; // namespace iceberg namespace vectorized { +#include "common/compile_check_begin.h" class IColumn; class PartitionColumnTransform; @@ -174,7 +175,7 @@ class StringTruncatePartitionColumnTransform : public PartitionColumnTransform { temp_arguments[0] = 0; // str column temp_arguments[1] = 1; // pos temp_arguments[2] = 2; // width - size_t result_column_id = 3; + uint32_t result_column_id = 3; SubstringUtil::substring_execute(temp_block, temp_arguments, result_column_id, temp_block.rows()); @@ -623,9 +624,9 @@ class DateBucketPartitionColumnTransform : public PartitionColumnTransform { DateV2Value value = binary_cast>(*(UInt32*)p_in); - int32_t days_from_unix_epoch = value.daynr() - 719528; - Int64 long_value = static_cast(days_from_unix_epoch); - uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, sizeof(long_value), 0); + int64_t days_from_unix_epoch = value.daynr() - 719528; + uint32_t hash_value = HashUtil::murmur_hash3_32(&days_from_unix_epoch, + sizeof(days_from_unix_epoch), 0); *p_out = (hash_value & INT32_MAX) % _bucket_num; ++p_in; @@ -836,7 +837,9 @@ class DateYearPartitionColumnTransform : public PartitionColumnTransform { while (p_in < end_in) { DateV2Value value = binary_cast>(*(UInt32*)p_in); - *p_out = datetime_diff(PartitionColumnTransformUtils::epoch_date(), value); + // datetime_diff actually returns int + *p_out = cast_set( + datetime_diff(PartitionColumnTransformUtils::epoch_date(), value)); ++p_in; ++p_out; } @@ -906,7 +909,9 @@ class TimestampYearPartitionColumnTransform : public PartitionColumnTransform { while (p_in < end_in) { DateV2Value value = binary_cast>(*(UInt64*)p_in); - *p_out = datetime_diff(PartitionColumnTransformUtils::epoch_datetime(), value); + // datetime_diff actually returns int + *p_out = cast_set( + datetime_diff(PartitionColumnTransformUtils::epoch_datetime(), value)); ++p_in; ++p_out; } @@ -976,7 +981,9 @@ class DateMonthPartitionColumnTransform : public PartitionColumnTransform { while (p_in < end_in) { DateV2Value value = binary_cast>(*(UInt32*)p_in); - *p_out = datetime_diff(PartitionColumnTransformUtils::epoch_date(), value); + // datetime_diff actually returns int + *p_out = cast_set( + datetime_diff(PartitionColumnTransformUtils::epoch_date(), value)); ++p_in; ++p_out; } @@ -1046,7 +1053,9 @@ class TimestampMonthPartitionColumnTransform : public PartitionColumnTransform { while (p_in < end_in) { DateV2Value value = binary_cast>(*(UInt64*)p_in); - *p_out = datetime_diff(PartitionColumnTransformUtils::epoch_datetime(), value); + // datetime_diff actually returns int + *p_out = cast_set( + datetime_diff(PartitionColumnTransformUtils::epoch_datetime(), value)); ++p_in; ++p_out; } @@ -1116,7 +1125,9 @@ class DateDayPartitionColumnTransform : public PartitionColumnTransform { while (p_in < end_in) { DateV2Value value = binary_cast>(*(UInt32*)p_in); - *p_out = datetime_diff(PartitionColumnTransformUtils::epoch_date(), value); + // datetime_diff actually returns int + *p_out = cast_set( + datetime_diff(PartitionColumnTransformUtils::epoch_date(), value)); ++p_in; ++p_out; } @@ -1192,7 +1203,9 @@ class TimestampDayPartitionColumnTransform : public PartitionColumnTransform { while (p_in < end_in) { DateV2Value value = binary_cast>(*(UInt64*)p_in); - *p_out = datetime_diff(PartitionColumnTransformUtils::epoch_datetime(), value); + // datetime_diff actually returns int + *p_out = cast_set( + datetime_diff(PartitionColumnTransformUtils::epoch_datetime(), value)); ++p_in; ++p_out; } @@ -1267,7 +1280,9 @@ class TimestampHourPartitionColumnTransform : public PartitionColumnTransform { while (p_in < end_in) { DateV2Value value = binary_cast>(*(UInt64*)p_in); - *p_out = datetime_diff(PartitionColumnTransformUtils::epoch_datetime(), value); + // hour diff would't overflow int32 + *p_out = cast_set( + datetime_diff(PartitionColumnTransformUtils::epoch_datetime(), value)); ++p_in; ++p_out; } @@ -1333,3 +1348,4 @@ class VoidPartitionColumnTransform : public PartitionColumnTransform { } // namespace vectorized } // namespace doris +#include "common/compile_check_end.h" diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp index 280cf8b810724c..29c97b59ea4dba 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp @@ -31,6 +31,7 @@ namespace doris { namespace vectorized { +#include "common/compile_check_begin.h" VIcebergTableWriter::VIcebergTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_expr_ctxs, diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp b/be/src/vec/sink/writer/vhive_table_writer.cpp index 6eb478c01b7f92..17e6bba326f707 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.cpp +++ b/be/src/vec/sink/writer/vhive_table_writer.cpp @@ -28,6 +28,7 @@ namespace doris { namespace vectorized { +#include "common/compile_check_begin.h" VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_expr_ctxs, diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp b/be/src/vec/sink/writer/vmysql_table_writer.cpp index a0d47ffec1ede9..e4529c59c9abfb 100644 --- a/be/src/vec/sink/writer/vmysql_table_writer.cpp +++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp @@ -51,6 +51,7 @@ namespace doris { namespace vectorized { +#include "common/compile_check_begin.h" std::string MysqlConnInfo::debug_string() const { std::stringstream ss; @@ -124,9 +125,9 @@ Status VMysqlTableWriter::write(RuntimeState* state, vectorized::Block& block) { Status VMysqlTableWriter::_insert_row(vectorized::Block& block, size_t row) { _insert_stmt_buffer.clear(); fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _conn_info.table_name); - int num_columns = _vec_output_expr_ctxs.size(); + size_t num_columns = _vec_output_expr_ctxs.size(); - for (int i = 0; i < num_columns; ++i) { + for (size_t i = 0; i < num_columns; ++i) { auto& column_ptr = block.get_by_position(i).column; auto& type_ptr = block.get_by_position(i).type; @@ -236,8 +237,7 @@ Status VMysqlTableWriter::_insert_row(vectorized::Block& block, size_t row) { break; } case TYPE_DATETIMEV2: { - uint32_t int_val = - assert_cast(*column).get_data()[row]; + auto int_val = assert_cast(*column).get_data()[row]; DateV2Value value = binary_cast>(int_val); diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp b/be/src/vec/sink/writer/vodbc_table_writer.cpp index 19cb2e50109890..d99dfc56aaad9d 100644 --- a/be/src/vec/sink/writer/vodbc_table_writer.cpp +++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp @@ -29,6 +29,7 @@ namespace doris { namespace vectorized { +#include "common/compile_check_begin.h" ODBCConnectorParam VOdbcTableWriter::create_connect_param(const doris::TDataSink& t_sink) { const TOdbcTableSink& t_odbc_sink = t_sink.odbc_table_sink; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 504ffb9cb749bf..55b6845b6bc871 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -92,6 +92,7 @@ namespace doris { class TExpr; namespace vectorized { +#include "common/compile_check_begin.h" bvar::Adder g_sink_write_bytes; bvar::PerSecond> g_sink_write_bytes_per_second("sink_throughput_byte", @@ -662,14 +663,14 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { _send_block_callback->clear_in_flight(); return; } - if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95F) { + if (double(compressed_bytes) >= double(config::brpc_max_body_size) * 0.95F) { LOG(WARNING) << "send block too large, this rpc may failed. send size: " << compressed_bytes << ", threshold: " << config::brpc_max_body_size << ", " << channel_info(); } } - int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; + auto remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { if (remain_ms <= 0 && !request->eos()) { cancel(fmt::format("{}, err: timeout", channel_info())); @@ -847,7 +848,7 @@ void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult if (result.has_load_channel_profile()) { TRuntimeProfileTree tprofile; const auto* buf = (const uint8_t*)result.load_channel_profile().data(); - uint32_t len = result.load_channel_profile().size(); + auto len = cast_set(result.load_channel_profile().size()); auto st = deserialize_thrift_msg(buf, &len, false, &tprofile); if (st.ok()) { _state->load_channel_profile()->update(tprofile); @@ -917,7 +918,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) { PTabletWriterCancelRequest, DummyBrpcCallback>::create_unique(request, cancel_callback); - int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; + auto remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { remain_ms = config::min_load_rpc_timeout_ms; } @@ -1706,7 +1707,7 @@ void VTabletWriter::_generate_one_index_channel_payload( size_t row_cnt = row_ids.size(); - for (int i = 0; i < row_ids.size(); i++) { + for (size_t i = 0; i < row_ids.size(); i++) { // (tablet_id, VNodeChannel) where this tablet locate auto it = _channels[index_idx]->_channels_by_tablet.find(tablet_ids[i]); DCHECK(it != _channels[index_idx]->_channels_by_tablet.end()) diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 3dc58be3bcde88..646cdfab0603ec 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -55,6 +55,7 @@ #include "vec/sink/vtablet_finder.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, std::shared_ptr dep, @@ -353,7 +354,7 @@ void VTabletWriterV2::_generate_rows_for_tablet(std::vector& r auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids; auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids; - for (int i = 0; i < row_ids.size(); i++) { + for (size_t i = 0; i < row_ids.size(); i++) { auto& tablet_id = tablet_ids[i]; auto it = rows_for_tablet.find(tablet_id); if (it == rows_for_tablet.end()) {