Skip to content

Commit

Permalink
variant predefine
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Sep 9, 2024
1 parent 40935f9 commit 99b7e12
Show file tree
Hide file tree
Showing 65 changed files with 1,406 additions and 207 deletions.
32 changes: 28 additions & 4 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "olap/rowset/segment_v2/vertical_segment_writer.h"

#include <gen_cpp/olap_file.pb.h>
#include <gen_cpp/segment_v2.pb.h>
#include <parallel_hashmap/phmap.h>

Expand All @@ -42,7 +43,8 @@
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/primary_key_index.h"
#include "olap/row_cursor.h" // RowCursor // IWYU pragma: keep
#include "olap/row_cursor.h" // RowCursor // IWYU pragma: keep
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
#include "olap/rowset/segment_creator.h"
#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
Expand All @@ -64,11 +66,15 @@
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/common/hash_table/hash_map_context_creator.h"
#include "vec/common/schema_util.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/io/reader_buffer.h"
#include "vec/json/path_in_data.h"
#include "vec/jsonb/serialize.h"
#include "vec/olap/olap_data_convertor.h"

Expand Down Expand Up @@ -625,6 +631,10 @@ Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock&
remove_nullable(column_ref)->assume_mutable_ref());
const TabletColumnPtr& parent_column = _tablet_schema->columns()[i];

std::map<std::string, TabletColumnPtr> typed_columns;
for (const auto& col : parent_column->get_sub_columns()) {
typed_columns[col->name()] = col;
}
// generate column info by entry info
auto generate_column_info = [&](const auto& entry) {
const std::string& column_name =
Expand All @@ -635,6 +645,12 @@ Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock&
auto full_path = full_path_builder.append(parent_column->name_lower_case(), false)
.append(entry->path.get_parts(), false)
.build();
if (typed_columns.contains(entry->path.get_path())) {
TabletColumn typed_column = *typed_columns[entry->path.get_path()];
typed_column.set_path_info(full_path);
typed_column.set_parent_unique_id(parent_column->unique_id());
return typed_column;
}
return vectorized::schema_util::get_column_by_type(
final_data_type_from_object, column_name,
vectorized::schema_util::ExtraInfo {
Expand All @@ -654,14 +670,22 @@ Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock&
CHECK(entry->data.is_finalized());
int current_column_id = column_id++;
TabletColumn tablet_column = generate_column_info(entry);
DataTypePtr storage_type =
vectorized::DataTypeFactory::instance().create_data_type(tablet_column);
DataTypePtr finalized_type = entry->data.get_least_common_type();
vectorized::ColumnPtr current_column =
entry->data.get_finalized_column_ptr()->get_ptr();
if (!storage_type->equals(*finalized_type)) {
RETURN_IF_ERROR(vectorized::schema_util::cast_column(
{current_column, finalized_type, ""}, storage_type, &current_column));
}
vectorized::schema_util::inherit_column_attributes(*parent_column, tablet_column,
_flush_schema);
RETURN_IF_ERROR(_create_column_writer(current_column_id /*unused*/, tablet_column,
_flush_schema));
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
{entry->data.get_finalized_column_ptr()->get_ptr(),
entry->data.get_least_common_type(), tablet_column.name()},
data.row_pos, data.num_rows, current_column_id));
{current_column->get_ptr(), storage_type, tablet_column.name()}, data.row_pos,
data.num_rows, current_column_id));
// convert column data from engine format to storage layer format
auto [status, column] = _olap_data_convertor->convert_column_data(current_column_id);
if (!status.ok()) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ TabletColumn TabletReader::materialize_column(const TabletColumn& orig) {
auto cast_type = _reader_context.target_cast_type_for_variants.at(orig.name());
FieldType filed_type = TabletColumn::get_field_type_by_type(cast_type.type);
column_with_cast_type.set_type(filed_type);
column_with_cast_type.set_frac(cast_type.scale);
column_with_cast_type.set_precision(cast_type.precision);
return column_with_cast_type;
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ FieldType TabletColumn::get_field_type_by_type(PrimitiveType primitiveType) {
return FieldType::OLAP_FIELD_TYPE_JSONB;
case PrimitiveType::TYPE_VARIANT:
return FieldType::OLAP_FIELD_TYPE_VARIANT;
case PrimitiveType::TYPE_IPV4:
return FieldType::OLAP_FIELD_TYPE_IPV4;
case PrimitiveType::TYPE_IPV6:
return FieldType::OLAP_FIELD_TYPE_IPV6;
case PrimitiveType::TYPE_LAMBDA_FUNCTION:
return FieldType::OLAP_FIELD_TYPE_UNKNOWN; // Not implemented
case PrimitiveType::TYPE_AGG_STATE:
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ class TabletColumn {
const std::vector<TabletColumnPtr>& sparse_columns() const;
size_t num_sparse_columns() const { return _num_sparse_columns; }

void set_precision(int32_t precision) {
_precision = precision;
_is_decimal = true;
}
void set_frac(int32_t frac) { _frac = frac; }

Status check_valid() const {
if (type() != FieldType::OLAP_FIELD_TYPE_ARRAY &&
type() != FieldType::OLAP_FIELD_TYPE_STRUCT &&
Expand Down
16 changes: 16 additions & 0 deletions be/src/runtime/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <utility>

#include "olap/olap_define.h"
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"

namespace doris {
Expand Down Expand Up @@ -108,6 +109,21 @@ TypeDescriptor::TypeDescriptor(const std::vector<TTypeNode>& types, int* idx)
contains_nulls.push_back(node.contains_nulls[1]);
break;
}
case TTypeNodeType::VARIANT: {
// complex variant type
DCHECK(!node.__isset.scalar_type);
DCHECK_LT(*idx, types.size() - 1);
DCHECK(!node.__isset.contains_nulls);
type = TYPE_VARIANT;
contains_nulls.reserve(node.struct_fields.size());
for (size_t i = 0; i < node.struct_fields.size(); i++) {
++(*idx);
children.push_back(TypeDescriptor(types, idx));
field_names.push_back(node.struct_fields[i].name);
contains_nulls.push_back(node.struct_fields[i].contains_null);
}
break;
}
default:
DCHECK(false) << node.type;
}
Expand Down
135 changes: 106 additions & 29 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_object.h"
#include "vec/data_types/get_least_supertype.h"
#include "vec/functions/function_binary_arithmetic.h"
#include "vec/json/path_in_data.h"

#ifdef __AVX2__
Expand All @@ -84,14 +85,16 @@
namespace doris::vectorized {
namespace {

DataTypePtr create_array_of_type(TypeIndex type, size_t num_dimensions, bool is_nullable) {
DataTypePtr create_array_of_type(TypeIndex type, size_t num_dimensions, bool is_nullable,
int precision = -1, int scale = -1) {
if (type == ColumnObject::MOST_COMMON_TYPE_ID) {
// JSONB type MUST NOT wrapped in ARRAY column, it should be top level.
// So we ignored num_dimensions.
return is_nullable ? make_nullable(std::make_shared<ColumnObject::MostCommonType>())
: std::make_shared<ColumnObject::MostCommonType>();
}
DataTypePtr result = DataTypeFactory::instance().create_data_type(type, is_nullable);
DataTypePtr result =
DataTypeFactory::instance().create_data_type(type, is_nullable, precision, scale);
for (size_t i = 0; i < num_dimensions; ++i) {
result = std::make_shared<DataTypeArray>(result);
if (is_nullable) {
Expand Down Expand Up @@ -341,7 +344,45 @@ void get_field_info_impl(const Field& field, FieldInfo* info) {
};
}

void get_base_field_info(const Field& field, FieldInfo* info) {
if (field.get_detailed_type_info().type == TypeIndex::Array) {
if (field.safe_get<Array>().empty()) {
info->scalar_type_id = TypeIndex::Nothing;
++info->num_dimensions;
info->have_nulls = true;
info->need_convert = false;
} else {
++info->num_dimensions;
get_base_field_info(field.safe_get<Array>()[0], info);
}
return;
}

// handle scalar types
info->scalar_type_id = field.get_detailed_type_info().type;
info->have_nulls = true;
info->need_convert = false;
info->scale = field.get_detailed_type_info().scale;
info->precision = field.get_detailed_type_info().precision;

// Currently the jsonb type should be the top level type, so we should not wrap it in array,
// see create_array_of_type.
// TODO we need to support array<jsonb> correctly
if (UNLIKELY(field.get_detailed_type_info().type == TypeIndex::JSONB &&
info->num_dimensions > 0)) {
info->num_dimensions = 0;
info->need_convert = true;
}
}

void get_field_info(const Field& field, FieldInfo* info) {
if (field.get_detailed_type_info().type != TypeIndex::Nothing) {
// Currently we support specify predefined schema for other types include decimal, datetime ...etc
// so we should set specified info to create correct types, and those predefined types are static and
// type no need to deduce
get_base_field_info(field, info);
return;
}
if (field.is_complex_field()) {
get_field_info_impl<FieldVisitorToScalarType>(field, info);
} else {
Expand Down Expand Up @@ -424,7 +465,11 @@ void ColumnObject::Subcolumn::insert(Field field, FieldInfo info) {
type_changed = true;
}
if (data.empty()) {
add_new_column_part(create_array_of_type(base_type.idx, value_dim, is_nullable));
// Currently we support specify predefined schema for other types include decimal, datetime ...etc
// so we should set specified info to create correct types, and those predefined types are static and
// no conflict, so we can set them directly.
add_new_column_part(create_array_of_type(base_type.idx, value_dim, is_nullable,
info.precision, info.scale));
} else if (least_common_type.get_base_type_id() != base_type.idx && !base_type.is_nothing()) {
if (schema_util::is_conversion_required_between_integers(
base_type.idx, least_common_type.get_base_type_id())) {
Expand Down Expand Up @@ -947,38 +992,75 @@ void ColumnObject::Subcolumn::get(size_t n, Field& res) const {
return;
}
if (is_finalized()) {
if (least_common_type.get_base_type_id() == TypeIndex::JSONB) {
// JsonbFiled is special case
res = JsonbField();
}
get_finalized_column().get(n, res);
res = get_least_common_type()->get_type_field(get_finalized_column(), n);
return;
}

size_t ind = n;
if (ind < num_of_defaults_in_prefix) {
res = least_common_type.get()->get_default();
ColumnPtr part_column;
DataTypePtr part_type;
size_t part_index = 0;
if (!get_part_column_type_and_index(n, &part_column, &part_type, &part_index)) {
res = Null();
return;
}
res = vectorized::remove_nullable(part_type)->get_default();
part_column->get(part_index, res);
Field new_field;
convert_field_to_type(res, *least_common_type.get(), &new_field);
res = new_field;
// size_t ind = n;
// if (ind < num_of_defaults_in_prefix) {
// res = least_common_type.get()->get_default();
// return;
// }

// ind -= num_of_defaults_in_prefix;
// for (size_t i = 0; i < data.size(); ++i) {
// const auto& part = data[i];
// const auto& part_type = data_types[i];
// if (ind < part->size()) {
// res = vectorized::remove_nullable(part_type)->get_default();
// part->get(ind, res);
// Field new_field;
// convert_field_to_type(res, *least_common_type.get(), &new_field);
// res = new_field;
// return;
// }

// ind -= part->size();
// }

// throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Index ({}) for getting field is out of range",
// n);
}

bool ColumnObject::Subcolumn::get_part_column_type_and_index(const size_t row,
ColumnPtr* part_column,
DataTypePtr* part_type,
size_t* index) const {
if (row < num_of_defaults_in_prefix) {
return false;
}
if (is_finalized()) {
*part_column = get_finalized_column_ptr();
*part_type = get_least_common_type();
*index = row;
return true;
}
size_t ind = row;
ind -= num_of_defaults_in_prefix;
for (size_t i = 0; i < data.size(); ++i) {
const auto& part = data[i];
const auto& part_type = data_types[i];
const auto& type = data_types[i];
if (ind < part->size()) {
res = vectorized::remove_nullable(part_type)->get_default();
part->get(ind, res);
Field new_field;
convert_field_to_type(res, *least_common_type.get(), &new_field);
res = new_field;
return;
*part_column = part;
*part_type = type;
*index = ind;
return true;
}

ind -= part->size();
}

throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Index ({}) for getting field is out of range",
n);
row);
}

Field ColumnObject::operator[](size_t n) const {
Expand Down Expand Up @@ -1347,12 +1429,6 @@ Status find_and_set_leave_value(const IColumn* column, const PathInData& path,
rapidjson::Value& root,
rapidjson::Document::AllocatorType& allocator, Arena& mem_pool,
int row) {
// sanitize type and column
if (column->get_name() != type->create_column()->get_name()) {
return Status::InternalError(
"failed to set value for path {}, expected type {}, but got {} at row {}",
path.get_path(), type->get_name(), column->get_name(), row);
}
const auto* nullable = check_and_get_column<ColumnNullable>(column);
if (skip_empty_json(nullable, type, row, path)) {
return Status::OK();
Expand All @@ -1367,7 +1443,8 @@ Status find_and_set_leave_value(const IColumn* column, const PathInData& path,
<< ", root: " << std::string(buffer.GetString(), buffer.GetSize());
return Status::NotFound("Not found path {}", path.get_path());
}
RETURN_IF_ERROR(type_serde->write_one_cell_to_json(*column, *target, allocator, mem_pool, row));
RETURN_IF_ERROR(
type_serde->write_one_cell_to_json(*column, *target, allocator, mem_pool, row, type));
return Status::OK();
}

Expand Down
19 changes: 18 additions & 1 deletion be/src/vec/columns/column_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ struct FieldInfo {
/// we need to convert scalars to the common type.
bool need_convert;
/// Number of dimension in array. 0 if field is scalar.
size_t num_dimensions;
size_t num_dimensions = 0;

// decimal info
int scale = 0;
int precision = 0;
};

void get_field_info(const Field& field, FieldInfo* info);
Expand Down Expand Up @@ -120,6 +124,10 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {

const DataTypePtr& get_least_common_type() const { return least_common_type.get(); }

const TypeIndex& get_least_common_base_type_id() const {
return least_common_type.get_base_type_id();
}

const DataTypePtr& get_least_common_typeBase() const {
return least_common_type.get_base();
}
Expand Down Expand Up @@ -176,6 +184,15 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {
void remove_nullable();

void add_new_column_part(DataTypePtr type);
// get the column type and index of a specified row
// Example row = 10
// row 0-7 is column with int type
// row 7-8 is column with bigint type
// row 9-15 is column with string type
// the row 10 is string type and index is 1
// Return false if row < num_of_defaults_in_prefix
bool get_part_column_type_and_index(const size_t row, ColumnPtr* part_column,
DataTypePtr* part_type, size_t* index) const;

friend class ColumnObject;

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/columns/subcolumn_tree.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#pragma once
#include <memory>
#include <unordered_map>

#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
Expand Down
Loading

0 comments on commit 99b7e12

Please sign in to comment.