Skip to content

Commit

Permalink
modify fe
Browse files Browse the repository at this point in the history
modify be
  • Loading branch information
eldenmoon committed Sep 27, 2024
1 parent c87c904 commit ffcb52e
Show file tree
Hide file tree
Showing 71 changed files with 1,289 additions and 241 deletions.
5 changes: 1 addition & 4 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1006,10 +1006,7 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto
}
RETURN_IF_ERROR(
iterator_hint->read_by_rowids(single_row_loc.data(), 1, file_storage_column));
// iterator_hint.reset(nullptr);
// Get it's inner field, for JSONB case
vectorized::Field field = remove_nullable(storage_type)->get_default();
file_storage_column->get(0, field);
vectorized::Field field = storage_type->get_type_field(*file_storage_column, 0);
result->insert(field);
} else {
int index = (slot->col_unique_id() >= 0) ? schema.field_index(slot->col_unique_id())
Expand Down
32 changes: 28 additions & 4 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "olap/rowset/segment_v2/vertical_segment_writer.h"

#include <gen_cpp/olap_file.pb.h>
#include <gen_cpp/segment_v2.pb.h>
#include <parallel_hashmap/phmap.h>

Expand All @@ -42,7 +43,8 @@
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/primary_key_index.h"
#include "olap/row_cursor.h" // RowCursor // IWYU pragma: keep
#include "olap/row_cursor.h" // RowCursor // IWYU pragma: keep
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
#include "olap/rowset/segment_creator.h"
#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
Expand All @@ -64,11 +66,15 @@
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/common/hash_table/hash_map_context_creator.h"
#include "vec/common/schema_util.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/io/reader_buffer.h"
#include "vec/json/path_in_data.h"
#include "vec/jsonb/serialize.h"
#include "vec/olap/olap_data_convertor.h"

Expand Down Expand Up @@ -596,6 +602,10 @@ Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock&
remove_nullable(column_ref)->assume_mutable_ref());
const TabletColumnPtr& parent_column = _tablet_schema->columns()[i];

std::map<std::string, TabletColumnPtr> typed_columns;
for (const auto& col : parent_column->get_sub_columns()) {
typed_columns[col->name()] = col;
}
// generate column info by entry info
auto generate_column_info = [&](const auto& entry) {
const std::string& column_name =
Expand All @@ -606,6 +616,12 @@ Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock&
auto full_path = full_path_builder.append(parent_column->name_lower_case(), false)
.append(entry->path.get_parts(), false)
.build();
if (typed_columns.contains(entry->path.get_path())) {
TabletColumn typed_column = *typed_columns[entry->path.get_path()];
typed_column.set_path_info(full_path);
typed_column.set_parent_unique_id(parent_column->unique_id());
return typed_column;
}
return vectorized::schema_util::get_column_by_type(
final_data_type_from_object, column_name,
vectorized::schema_util::ExtraInfo {
Expand All @@ -625,14 +641,22 @@ Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock&
CHECK(entry->data.is_finalized());
int current_column_id = column_id++;
TabletColumn tablet_column = generate_column_info(entry);
DataTypePtr storage_type =
vectorized::DataTypeFactory::instance().create_data_type(tablet_column);
DataTypePtr finalized_type = entry->data.get_least_common_type();
vectorized::ColumnPtr current_column =
entry->data.get_finalized_column_ptr()->get_ptr();
if (!storage_type->equals(*finalized_type)) {
RETURN_IF_ERROR(vectorized::schema_util::cast_column(
{current_column, finalized_type, ""}, storage_type, &current_column));
}
vectorized::schema_util::inherit_column_attributes(*parent_column, tablet_column,
_flush_schema);
RETURN_IF_ERROR(_create_column_writer(current_column_id /*unused*/, tablet_column,
_flush_schema));
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
{entry->data.get_finalized_column_ptr()->get_ptr(),
entry->data.get_least_common_type(), tablet_column.name()},
data.row_pos, data.num_rows, current_column_id));
{current_column->get_ptr(), storage_type, tablet_column.name()}, data.row_pos,
data.num_rows, current_column_id));
// convert column data from engine format to storage layer format
auto [status, column] = _olap_data_convertor->convert_column_data(current_column_id);
if (!status.ok()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ TabletColumn TabletReader::materialize_column(const TabletColumn& orig) {
cast_type.type);
}
column_with_cast_type.set_type(filed_type);
column_with_cast_type.set_precision_frac(cast_type.precision, cast_type.scale);
return column_with_cast_type;
}

Expand Down
8 changes: 7 additions & 1 deletion be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ FieldType TabletColumn::get_field_type_by_type(PrimitiveType primitiveType) {
return FieldType::OLAP_FIELD_TYPE_JSONB;
case PrimitiveType::TYPE_VARIANT:
return FieldType::OLAP_FIELD_TYPE_VARIANT;
case PrimitiveType::TYPE_IPV4:
return FieldType::OLAP_FIELD_TYPE_IPV4;
case PrimitiveType::TYPE_IPV6:
return FieldType::OLAP_FIELD_TYPE_IPV6;
case PrimitiveType::TYPE_LAMBDA_FUNCTION:
return FieldType::OLAP_FIELD_TYPE_UNKNOWN; // Not implemented
case PrimitiveType::TYPE_AGG_STATE:
Expand Down Expand Up @@ -608,8 +612,10 @@ void TabletColumn::to_schema_pb(ColumnPB* column) const {
if (_has_default_value) {
column->set_default_value(_default_value);
}
if (_is_decimal) {
if (_precision >= 0) {
column->set_precision(_precision);
}
if (_frac >= 0) {
column->set_frac(_frac);
}
column->set_length(_length);
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <parallel_hashmap/phmap.h>

#include <algorithm>
#include <cstdint>
#include <map>
#include <memory>
#include <string>
Expand Down Expand Up @@ -177,6 +178,12 @@ class TabletColumn {
const std::vector<TabletColumnPtr>& sparse_columns() const;
size_t num_sparse_columns() const { return _num_sparse_columns; }

void set_precision_frac(int32_t precision, int32_t frac, bool is_decimal = true) {
_precision = precision;
_frac = frac;
_is_decimal = is_decimal;
}

Status check_valid() const {
if (type() != FieldType::OLAP_FIELD_TYPE_ARRAY &&
type() != FieldType::OLAP_FIELD_TYPE_STRUCT &&
Expand Down
16 changes: 16 additions & 0 deletions be/src/runtime/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <utility>

#include "olap/olap_define.h"
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"

namespace doris {
Expand Down Expand Up @@ -108,6 +109,21 @@ TypeDescriptor::TypeDescriptor(const std::vector<TTypeNode>& types, int* idx)
contains_nulls.push_back(node.contains_nulls[1]);
break;
}
case TTypeNodeType::VARIANT: {
// complex variant type
DCHECK(!node.__isset.scalar_type);
DCHECK_LT(*idx, types.size() - 1);
DCHECK(!node.__isset.contains_nulls);
type = TYPE_VARIANT;
contains_nulls.reserve(node.struct_fields.size());
for (size_t i = 0; i < node.struct_fields.size(); i++) {
++(*idx);
children.push_back(TypeDescriptor(types, idx));
field_names.push_back(node.struct_fields[i].name);
contains_nulls.push_back(node.struct_fields[i].contains_null);
}
break;
}
default:
DCHECK(false) << node.type;
}
Expand Down
67 changes: 51 additions & 16 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "exprs/json_functions.h"
#include "olap/olap_common.h"
#include "util/defer_op.h"
#include "util/jsonb_utils.h"
#include "util/simd/bits.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/aggregate_functions/helpers.h"
Expand Down Expand Up @@ -73,6 +74,7 @@
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_object.h"
#include "vec/data_types/get_least_supertype.h"
#include "vec/functions/function_binary_arithmetic.h"
#include "vec/json/path_in_data.h"

#ifdef __AVX2__
Expand All @@ -84,14 +86,16 @@
namespace doris::vectorized {
namespace {

DataTypePtr create_array_of_type(TypeIndex type, size_t num_dimensions, bool is_nullable) {
DataTypePtr create_array_of_type(TypeIndex type, size_t num_dimensions, bool is_nullable,
int precision = -1, int scale = -1) {
if (type == ColumnObject::MOST_COMMON_TYPE_ID) {
// JSONB type MUST NOT wrapped in ARRAY column, it should be top level.
// So we ignored num_dimensions.
return is_nullable ? make_nullable(std::make_shared<ColumnObject::MostCommonType>())
: std::make_shared<ColumnObject::MostCommonType>();
}
DataTypePtr result = DataTypeFactory::instance().create_data_type(type, is_nullable);
DataTypePtr result =
DataTypeFactory::instance().create_data_type(type, is_nullable, precision, scale);
for (size_t i = 0; i < num_dimensions; ++i) {
result = std::make_shared<DataTypeArray>(result);
if (is_nullable) {
Expand Down Expand Up @@ -341,7 +345,44 @@ void get_field_info_impl(const Field& field, FieldInfo* info) {
};
}

void get_base_field_info(const Field& field, FieldInfo* info) {
if (field.get_type_id() == TypeIndex::Array) {
if (field.safe_get<Array>().empty()) {
info->scalar_type_id = TypeIndex::Nothing;
++info->num_dimensions;
info->have_nulls = true;
info->need_convert = false;
} else {
++info->num_dimensions;
get_base_field_info(field.safe_get<Array>()[0], info);
}
return;
}

// handle scalar types
info->scalar_type_id = field.get_type_id();
info->have_nulls = true;
info->need_convert = false;
info->scale = field.get_scale();
info->precision = field.get_precision();

// Currently the jsonb type should be the top level type, so we should not wrap it in array,
// see create_array_of_type.
// TODO we need to support array<jsonb> correctly
if (UNLIKELY(field.get_type_id() == TypeIndex::JSONB && info->num_dimensions > 0)) {
info->num_dimensions = 0;
info->need_convert = true;
}
}

void get_field_info(const Field& field, FieldInfo* info) {
if (field.get_type_id() != TypeIndex::Nothing) {
// Currently we support specify predefined schema for other types include decimal, datetime ...etc
// so we should set specified info to create correct types, and those predefined types are static and
// type no need to deduce
get_base_field_info(field, info);
return;
}
if (field.is_complex_field()) {
get_field_info_impl<FieldVisitorToScalarType>(field, info);
} else {
Expand Down Expand Up @@ -424,7 +465,11 @@ void ColumnObject::Subcolumn::insert(Field field, FieldInfo info) {
type_changed = true;
}
if (data.empty()) {
add_new_column_part(create_array_of_type(base_type.idx, value_dim, is_nullable));
// Currently we support specify predefined schema for other types include decimal, datetime ...etc
// so we should set specified info to create correct types, and those predefined types are static and
// no conflict, so we can set them directly.
add_new_column_part(create_array_of_type(base_type.idx, value_dim, is_nullable,
info.precision, info.scale));
} else if (least_common_type.get_base_type_id() != base_type.idx && !base_type.is_nothing()) {
if (schema_util::is_conversion_required_between_integers(
base_type.idx, least_common_type.get_base_type_id())) {
Expand Down Expand Up @@ -947,14 +992,9 @@ void ColumnObject::Subcolumn::get(size_t n, Field& res) const {
return;
}
if (is_finalized()) {
if (least_common_type.get_base_type_id() == TypeIndex::JSONB) {
// JsonbFiled is special case
res = JsonbField();
}
get_finalized_column().get(n, res);
res = get_least_common_type()->get_type_field(get_finalized_column(), n);
return;
}

size_t ind = n;
if (ind < num_of_defaults_in_prefix) {
res = least_common_type.get()->get_default();
Expand Down Expand Up @@ -1347,12 +1387,6 @@ Status find_and_set_leave_value(const IColumn* column, const PathInData& path,
rapidjson::Value& root,
rapidjson::Document::AllocatorType& allocator, Arena& mem_pool,
int row) {
// sanitize type and column
if (column->get_name() != type->create_column()->get_name()) {
return Status::InternalError(
"failed to set value for path {}, expected type {}, but got {} at row {}",
path.get_path(), type->get_name(), column->get_name(), row);
}
const auto* nullable = check_and_get_column<ColumnNullable>(column);
if (skip_empty_json(nullable, type, row, path)) {
return Status::OK();
Expand All @@ -1367,7 +1401,8 @@ Status find_and_set_leave_value(const IColumn* column, const PathInData& path,
<< ", root: " << std::string(buffer.GetString(), buffer.GetSize());
return Status::NotFound("Not found path {}", path.get_path());
}
RETURN_IF_ERROR(type_serde->write_one_cell_to_json(*column, *target, allocator, mem_pool, row));
RETURN_IF_ERROR(
type_serde->write_one_cell_to_json(*column, *target, allocator, mem_pool, row, type));
return Status::OK();
}

Expand Down
10 changes: 9 additions & 1 deletion be/src/vec/columns/column_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ struct FieldInfo {
/// we need to convert scalars to the common type.
bool need_convert;
/// Number of dimension in array. 0 if field is scalar.
size_t num_dimensions;
size_t num_dimensions = 0;

// decimal info
int scale = 0;
int precision = 0;
};

void get_field_info(const Field& field, FieldInfo* info);
Expand Down Expand Up @@ -120,6 +124,10 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {

const DataTypePtr& get_least_common_type() const { return least_common_type.get(); }

const TypeIndex& get_least_common_base_type_id() const {
return least_common_type.get_base_type_id();
}

const DataTypePtr& get_least_common_typeBase() const {
return least_common_type.get_base();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/columns/subcolumn_tree.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#pragma once
#include <memory>
#include <unordered_map>

#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
Expand Down
Loading

0 comments on commit ffcb52e

Please sign in to comment.