Skip to content

Commit

Permalink
[env](compile) open compile check for table writers
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Dec 4, 2024
1 parent d22bd83 commit 7ccf5a6
Show file tree
Hide file tree
Showing 13 changed files with 64 additions and 42 deletions.
5 changes: 1 addition & 4 deletions be/src/vec/exec/format/table/iceberg/struct_like.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,8 @@ namespace iceberg {
class StructLike {
public:
virtual ~StructLike() = default;
virtual int size() const = 0;

virtual std::any get(int pos) const = 0;

virtual void set(int pos, const std::any& value) = 0;
virtual std::any get(size_t pos) const = 0;
};

} // namespace iceberg
Expand Down
16 changes: 13 additions & 3 deletions be/src/vec/sink/vrow_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <memory>
#include <string>

#include "common/cast_set.h"
#include "common/logging.h"
#include "common/status.h"
#include "runtime/client_cache.h"
Expand Down Expand Up @@ -225,7 +226,10 @@ void VRowDistribution::_filter_block_by_skip(vectorized::Block* block,
auto& partition_ids = row_part_tablet_id.partition_ids;
auto& tablet_ids = row_part_tablet_id.tablet_ids;

for (size_t i = 0; i < block->rows(); i++) {
auto rows = block->rows();
// row count of a block should not exceed UINT32_MAX
auto rows_uint32 = cast_set<uint32_t>(rows);
for (uint32_t i = 0; i < rows_uint32; i++) {
if (!_skip[i]) {
row_ids.emplace_back(i);
partition_ids.emplace_back(_partitions[i]->id);
Expand All @@ -250,7 +254,10 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
auto& tablet_ids = row_part_tablet_id.tablet_ids;
if (const auto* nullable_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(*filter_column)) {
for (size_t i = 0; i < block->rows(); i++) {
auto rows = block->rows();
// row count of a block should not exceed UINT32_MAX
auto rows_uint32 = cast_set<uint32_t>(rows);
for (uint32_t i = 0; i < rows_uint32; i++) {
if (nullable_column->get_bool_inline(i) && !_skip[i]) {
row_ids.emplace_back(i);
partition_ids.emplace_back(_partitions[i]->id);
Expand All @@ -267,7 +274,10 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
_filter_block_by_skip(block, row_part_tablet_id);
} else {
const auto& filter = assert_cast<const vectorized::ColumnUInt8&>(*filter_column).get_data();
for (size_t i = 0; i < block->rows(); i++) {
auto rows = block->rows();
// row count of a block should not exceed UINT32_MAX
auto rows_uint32 = cast_set<uint32_t>(rows);
for (uint32_t i = 0; i < rows_uint32; i++) {
if (filter[i] != 0 && !_skip[i]) {
row_ids.emplace_back(i);
partition_ids.emplace_back(_partitions[i]->id);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/vrow_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class VNodeChannel;
// <row_idx, partition_id, tablet_id>
class RowPartTabletIds {
public:
std::vector<int64_t> row_ids;
std::vector<uint32_t> row_ids;
std::vector<int64_t> partition_ids;
std::vector<int64_t> tablet_ids;

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/sink/writer/async_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class RowDescriptor;
class TExpr;

namespace vectorized {
#include "common/compile_check_begin.h"

AsyncResultWriter::AsyncResultWriter(const doris::vectorized::VExprContextSPtrs& output_expr_ctxs,
std::shared_ptr<pipeline::Dependency> dep,
Expand Down Expand Up @@ -225,7 +226,7 @@ void AsyncResultWriter::_return_free_block(std::unique_ptr<Block> b) {
}

std::unique_ptr<Block> AsyncResultWriter::_get_free_block(doris::vectorized::Block* block,
int rows) {
size_t rows) {
std::unique_ptr<Block> b;
if (!_free_blocks.try_dequeue(b)) {
b = block->create_same_struct_block(rows, true);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/async_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class AsyncResultWriter : public ResultWriter {
Status _projection_block(Block& input_block, Block* output_block);
const VExprContextSPtrs& _vec_output_expr_ctxs;

std::unique_ptr<Block> _get_free_block(Block*, int rows);
std::unique_ptr<Block> _get_free_block(Block*, size_t rows);

void _return_free_block(std::unique_ptr<Block>);

Expand Down
15 changes: 4 additions & 11 deletions be/src/vec/sink/writer/iceberg/partition_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,24 @@

namespace doris {
namespace vectorized {
#include "common/compile_check_begin.h"

class PartitionData : public iceberg::StructLike {
public:
explicit PartitionData(std::vector<std::any> partition_values)
: _partition_values(std::move(partition_values)) {}

int size() const override { return _partition_values.size(); }

std::any get(int pos) const override {
if (pos < 0 || pos >= _partition_values.size()) {
std::any get(size_t pos) const override {
if (pos >= _partition_values.size()) {
throw std::out_of_range("Index out of range");
}
return _partition_values[pos];
}

void set(int pos, const std::any& value) override {
if (pos < 0 || pos >= _partition_values.size()) {
throw std::out_of_range("Index out of range");
}
_partition_values[pos] = value;
}

private:
std::vector<std::any> _partition_values;
};

} // namespace vectorized
} // namespace doris
#include "common/compile_check_end.h"
38 changes: 27 additions & 11 deletions be/src/vec/sink/writer/iceberg/partition_transformers.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class PartitionField;
}; // namespace iceberg

namespace vectorized {
#include "common/compile_check_begin.h"

class IColumn;
class PartitionColumnTransform;
Expand Down Expand Up @@ -174,7 +175,7 @@ class StringTruncatePartitionColumnTransform : public PartitionColumnTransform {
temp_arguments[0] = 0; // str column
temp_arguments[1] = 1; // pos
temp_arguments[2] = 2; // width
size_t result_column_id = 3;
uint32_t result_column_id = 3;

SubstringUtil::substring_execute(temp_block, temp_arguments, result_column_id,
temp_block.rows());
Expand Down Expand Up @@ -623,9 +624,9 @@ class DateBucketPartitionColumnTransform : public PartitionColumnTransform {
DateV2Value<DateV2ValueType> value =
binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);

int32_t days_from_unix_epoch = value.daynr() - 719528;
Int64 long_value = static_cast<Int64>(days_from_unix_epoch);
uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, sizeof(long_value), 0);
int64_t days_from_unix_epoch = value.daynr() - 719528;
uint32_t hash_value = HashUtil::murmur_hash3_32(&days_from_unix_epoch,
sizeof(days_from_unix_epoch), 0);

*p_out = (hash_value & INT32_MAX) % _bucket_num;
++p_in;
Expand Down Expand Up @@ -836,7 +837,9 @@ class DateYearPartitionColumnTransform : public PartitionColumnTransform {
while (p_in < end_in) {
DateV2Value<DateV2ValueType> value =
binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
*p_out = datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_date(), value);
// datetime_diff<YEAR> actually returns int
*p_out = cast_set<int, int64_t, false>(
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_date(), value));
++p_in;
++p_out;
}
Expand Down Expand Up @@ -906,7 +909,9 @@ class TimestampYearPartitionColumnTransform : public PartitionColumnTransform {
while (p_in < end_in) {
DateV2Value<DateTimeV2ValueType> value =
binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
*p_out = datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_datetime(), value);
// datetime_diff<YEAR> actually returns int
*p_out = cast_set<int, int64_t, false>(
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_datetime(), value));
++p_in;
++p_out;
}
Expand Down Expand Up @@ -976,7 +981,9 @@ class DateMonthPartitionColumnTransform : public PartitionColumnTransform {
while (p_in < end_in) {
DateV2Value<DateV2ValueType> value =
binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
*p_out = datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_date(), value);
// datetime_diff<MONTH> actually returns int
*p_out = cast_set<int, int64_t, false>(
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_date(), value));
++p_in;
++p_out;
}
Expand Down Expand Up @@ -1046,7 +1053,9 @@ class TimestampMonthPartitionColumnTransform : public PartitionColumnTransform {
while (p_in < end_in) {
DateV2Value<DateTimeV2ValueType> value =
binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
*p_out = datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_datetime(), value);
// datetime_diff<MONTH> actually returns int
*p_out = cast_set<int, int64_t, false>(
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_datetime(), value));
++p_in;
++p_out;
}
Expand Down Expand Up @@ -1116,7 +1125,9 @@ class DateDayPartitionColumnTransform : public PartitionColumnTransform {
while (p_in < end_in) {
DateV2Value<DateV2ValueType> value =
binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
*p_out = datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_date(), value);
// datetime_diff<DAY> actually returns int
*p_out = cast_set<int, int64_t, false>(
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_date(), value));
++p_in;
++p_out;
}
Expand Down Expand Up @@ -1192,7 +1203,9 @@ class TimestampDayPartitionColumnTransform : public PartitionColumnTransform {
while (p_in < end_in) {
DateV2Value<DateTimeV2ValueType> value =
binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
*p_out = datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_datetime(), value);
// datetime_diff<DAY> actually returns int
*p_out = cast_set<int, int64_t, false>(
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_datetime(), value));
++p_in;
++p_out;
}
Expand Down Expand Up @@ -1267,7 +1280,9 @@ class TimestampHourPartitionColumnTransform : public PartitionColumnTransform {
while (p_in < end_in) {
DateV2Value<DateTimeV2ValueType> value =
binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
*p_out = datetime_diff<HOUR>(PartitionColumnTransformUtils::epoch_datetime(), value);
// hour diff would't overflow int32
*p_out = cast_set<int, int64_t, false>(
datetime_diff<HOUR>(PartitionColumnTransformUtils::epoch_datetime(), value));
++p_in;
++p_out;
}
Expand Down Expand Up @@ -1333,3 +1348,4 @@ class VoidPartitionColumnTransform : public PartitionColumnTransform {

} // namespace vectorized
} // namespace doris
#include "common/compile_check_end.h"
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

namespace doris {
namespace vectorized {
#include "common/compile_check_begin.h"

VIcebergTableWriter::VIcebergTableWriter(const TDataSink& t_sink,
const VExprContextSPtrs& output_expr_ctxs,
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vhive_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

namespace doris {
namespace vectorized {
#include "common/compile_check_begin.h"

VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
const VExprContextSPtrs& output_expr_ctxs,
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/sink/writer/vmysql_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

namespace doris {
namespace vectorized {
#include "common/compile_check_begin.h"

std::string MysqlConnInfo::debug_string() const {
std::stringstream ss;
Expand Down Expand Up @@ -124,9 +125,9 @@ Status VMysqlTableWriter::write(RuntimeState* state, vectorized::Block& block) {
Status VMysqlTableWriter::_insert_row(vectorized::Block& block, size_t row) {
_insert_stmt_buffer.clear();
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _conn_info.table_name);
int num_columns = _vec_output_expr_ctxs.size();
size_t num_columns = _vec_output_expr_ctxs.size();

for (int i = 0; i < num_columns; ++i) {
for (size_t i = 0; i < num_columns; ++i) {
auto& column_ptr = block.get_by_position(i).column;
auto& type_ptr = block.get_by_position(i).type;

Expand Down Expand Up @@ -236,8 +237,7 @@ Status VMysqlTableWriter::_insert_row(vectorized::Block& block, size_t row) {
break;
}
case TYPE_DATETIMEV2: {
uint32_t int_val =
assert_cast<const vectorized::ColumnUInt64&>(*column).get_data()[row];
auto int_val = assert_cast<const vectorized::ColumnUInt64&>(*column).get_data()[row];
DateV2Value<DateTimeV2ValueType> value =
binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(int_val);

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vodbc_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

namespace doris {
namespace vectorized {
#include "common/compile_check_begin.h"

ODBCConnectorParam VOdbcTableWriter::create_connect_param(const doris::TDataSink& t_sink) {
const TOdbcTableSink& t_odbc_sink = t_sink.odbc_table_sink;
Expand Down
11 changes: 6 additions & 5 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ namespace doris {
class TExpr;

namespace vectorized {
#include "common/compile_check_begin.h"

bvar::Adder<int64_t> g_sink_write_bytes;
bvar::PerSecond<bvar::Adder<int64_t>> g_sink_write_bytes_per_second("sink_throughput_byte",
Expand Down Expand Up @@ -662,14 +663,14 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) {
_send_block_callback->clear_in_flight();
return;
}
if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95F) {
if (double(compressed_bytes) >= double(config::brpc_max_body_size) * 0.95F) {
LOG(WARNING) << "send block too large, this rpc may failed. send size: "
<< compressed_bytes << ", threshold: " << config::brpc_max_body_size
<< ", " << channel_info();
}
}

int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS;
auto remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS;
if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
if (remain_ms <= 0 && !request->eos()) {
cancel(fmt::format("{}, err: timeout", channel_info()));
Expand Down Expand Up @@ -847,7 +848,7 @@ void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult
if (result.has_load_channel_profile()) {
TRuntimeProfileTree tprofile;
const auto* buf = (const uint8_t*)result.load_channel_profile().data();
uint32_t len = result.load_channel_profile().size();
auto len = cast_set<uint32_t>(result.load_channel_profile().size());
auto st = deserialize_thrift_msg(buf, &len, false, &tprofile);
if (st.ok()) {
_state->load_channel_profile()->update(tprofile);
Expand Down Expand Up @@ -917,7 +918,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
PTabletWriterCancelRequest,
DummyBrpcCallback<PTabletWriterCancelResult>>::create_unique(request, cancel_callback);

int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS;
auto remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS;
if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
remain_ms = config::min_load_rpc_timeout_ms;
}
Expand Down Expand Up @@ -1706,7 +1707,7 @@ void VTabletWriter::_generate_one_index_channel_payload(

size_t row_cnt = row_ids.size();

for (int i = 0; i < row_ids.size(); i++) {
for (size_t i = 0; i < row_ids.size(); i++) {
// (tablet_id, VNodeChannel) where this tablet locate
auto it = _channels[index_idx]->_channels_by_tablet.find(tablet_ids[i]);
DCHECK(it != _channels[index_idx]->_channels_by_tablet.end())
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "vec/sink/vtablet_finder.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
std::shared_ptr<pipeline::Dependency> dep,
Expand Down Expand Up @@ -353,7 +354,7 @@ void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& r
auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids;

for (int i = 0; i < row_ids.size(); i++) {
for (size_t i = 0; i < row_ids.size(); i++) {
auto& tablet_id = tablet_ids[i];
auto it = rows_for_tablet.find(tablet_id);
if (it == rows_for_tablet.end()) {
Expand Down

0 comments on commit 7ccf5a6

Please sign in to comment.