Skip to content

Commit

Permalink
[Fix](Variant) fix variant partial update with row store enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Jun 25, 2024
1 parent 3e6e49e commit 1f06617
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 156 deletions.
8 changes: 0 additions & 8 deletions be/src/vec/columns/column_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,6 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {
// this structure and fill with Subcolumns sub items
mutable std::shared_ptr<rapidjson::Document> doc_structure;

// column with raw json strings
// used for quickly row store encoding
ColumnPtr rowstore_column;

using SubColumnWithName = std::pair<PathInData, const Subcolumn*>;
// Cached search results for previous row (keyed as index in JSON object) - used as a hint.
mutable std::vector<SubColumnWithName> _prev_positions;
Expand All @@ -259,10 +255,6 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {
return subcolumns.get_mutable_root()->data.get_finalized_column_ptr()->assume_mutable();
}

void set_rowstore_column(ColumnPtr col) { rowstore_column = col; }

ColumnPtr get_rowstore_column() const { return rowstore_column; }

Status serialize_one_row_to_string(int row, std::string* output) const;

Status serialize_one_row_to_string(int row, BufferWritable& output) const;
Expand Down
135 changes: 5 additions & 130 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,36 +492,8 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
var.assume_mutable_ref().finalize();

MutableColumnPtr variant_column;
bool record_raw_string_with_serialization = false;
// set
auto encode_rowstore = [&]() {
if (!ctx.record_raw_json_column) {
return Status::OK();
}
auto* var = static_cast<vectorized::ColumnObject*>(variant_column.get());
if (record_raw_string_with_serialization) {
// encode to raw json column
auto raw_column = vectorized::ColumnString::create();
for (size_t i = 0; i < var->rows(); ++i) {
std::string raw_str;
RETURN_IF_ERROR(var->serialize_one_row_to_string(i, &raw_str));
raw_column->insert_data(raw_str.c_str(), raw_str.size());
}
var->set_rowstore_column(raw_column->get_ptr());
} else {
// use original input json column
auto original_var_root = vectorized::check_and_get_column<vectorized::ColumnObject>(
remove_nullable(column_ref).get())
->get_root();
var->set_rowstore_column(original_var_root);
}
return Status::OK();
};

if (!var.is_scalar_variant()) {
variant_column = var.assume_mutable();
record_raw_string_with_serialization = true;
RETURN_IF_ERROR(encode_rowstore());
// already parsed
continue;
}
Expand Down Expand Up @@ -558,8 +530,6 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
result = ColumnNullable::create(result, null_map);
}
block.get_by_position(variant_pos[i]).column = result;
RETURN_IF_ERROR(encode_rowstore());
// block.get_by_position(variant_pos[i]).type = std::make_shared<DataTypeObject>("json", true);
}
return Status::OK();
}
Expand Down Expand Up @@ -600,35 +570,6 @@ Status encode_variant_sparse_subcolumns(ColumnObject& column) {
return Status::OK();
}

static void _append_column(const TabletColumn& parent_variant,
const ColumnObject::Subcolumns::NodePtr& subcolumn,
TabletSchemaSPtr& to_append, bool is_sparse) {
// If column already exist in original tablet schema, then we pick common type
// and cast column to common type, and modify tablet column to common type,
// otherwise it's a new column
CHECK(to_append.use_count() == 1);
const std::string& column_name =
parent_variant.name_lower_case() + "." + subcolumn->path.get_path();
const vectorized::DataTypePtr& final_data_type_from_object =
subcolumn->data.get_least_common_type();
vectorized::PathInDataBuilder full_path_builder;
auto full_path = full_path_builder.append(parent_variant.name_lower_case(), false)
.append(subcolumn->path.get_parts(), false)
.build();
TabletColumn tablet_column = vectorized::schema_util::get_column_by_type(
final_data_type_from_object, column_name,
vectorized::schema_util::ExtraInfo {.unique_id = -1,
.parent_unique_id = parent_variant.unique_id(),
.path_info = full_path});

if (!is_sparse) {
to_append->append_column(std::move(tablet_column));
} else {
to_append->mutable_column_by_uid(parent_variant.unique_id())
.append_sparse_column(std::move(tablet_column));
}
}

// sort by paths in lexicographical order
vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
const vectorized::ColumnObject::Subcolumns& subcolumns) {
Expand All @@ -640,70 +581,12 @@ vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
return sorted;
}

void rebuild_schema_and_block(const TabletSchemaSPtr& original,
const std::vector<int>& variant_positions, Block& flush_block,
TabletSchemaSPtr& flush_schema) {
// rebuild schema and block with variant extracted columns

// 1. Flatten variant column into flat columns, append flatten columns to the back of original Block and TabletSchema
// those columns are extracted columns, leave none extracted columns remain in original variant column, which is
// JSONB format at present.
// 2. Collect columns that need to be added or modified when data type changes or new columns encountered
for (size_t variant_pos : variant_positions) {
auto column_ref = flush_block.get_by_position(variant_pos).column;
bool is_nullable = column_ref->is_nullable();
const vectorized::ColumnObject& object_column = assert_cast<vectorized::ColumnObject&>(
remove_nullable(column_ref)->assume_mutable_ref());
const TabletColumn& parent_column = *original->columns()[variant_pos];
CHECK(object_column.is_finalized());
std::shared_ptr<vectorized::ColumnObject::Subcolumns::Node> root;
// common extracted columns
for (const auto& entry : get_sorted_subcolumns(object_column.get_subcolumns())) {
if (entry->path.empty()) {
// root
root = entry;
continue;
}
_append_column(parent_column, entry, flush_schema, false);
const std::string& column_name =
parent_column.name_lower_case() + "." + entry->path.get_path();
flush_block.insert({entry->data.get_finalized_column_ptr()->get_ptr(),
entry->data.get_least_common_type(), column_name});
}

// add sparse columns to flush_schema
for (const auto& entry : get_sorted_subcolumns(object_column.get_sparse_subcolumns())) {
_append_column(parent_column, entry, flush_schema, true);
}

// Create new variant column and set root column
auto obj = vectorized::ColumnObject::create(true, false);
// '{}' indicates a root path
static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column(
{}, root->data.get_finalized_column_ptr()->assume_mutable(),
root->data.get_least_common_type());
// // set for rowstore
if (original->has_row_store_for_all_columns()) {
static_cast<vectorized::ColumnObject*>(obj.get())->set_rowstore_column(
object_column.get_rowstore_column());
}
vectorized::ColumnPtr result = obj->get_ptr();
if (is_nullable) {
const auto& null_map = assert_cast<const vectorized::ColumnNullable&>(*column_ref)
.get_null_map_column_ptr();
result = vectorized::ColumnNullable::create(result, null_map);
}
flush_block.get_by_position(variant_pos).column = result;
vectorized::PathInDataBuilder full_root_path_builder;
auto full_root_path =
full_root_path_builder.append(parent_column.name_lower_case(), false).build();
TabletColumn new_col = flush_schema->column(variant_pos);
new_col.set_path_info(full_root_path);
flush_schema->replace_column(variant_pos, new_col);
VLOG_DEBUG << "set root_path : " << full_root_path.get_path();
}
// ---------------------------

vectorized::schema_util::inherit_column_attributes(flush_schema);
std::string dump_column(DataTypePtr type, const ColumnPtr& col) {
Block tmp;
tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()});
return tmp.dump_data(0, tmp.rows());
}

// ---------------------------
Expand Down Expand Up @@ -734,13 +617,5 @@ Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst)
->assume_mutable();
return Status::OK();
}
// ---------------------------

std::string dump_column(DataTypePtr type, const ColumnPtr& col) {
Block tmp;
tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()});
return tmp.dump_data(0, tmp.rows());
}
// ---------------------------

} // namespace doris::vectorized::schema_util
8 changes: 0 additions & 8 deletions be/src/vec/common/schema_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,6 @@ void inherit_column_attributes(const TabletColumn& source, TabletColumn& target,
vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
const vectorized::ColumnObject::Subcolumns& subcolumns);

// Rebuild schema from original schema by extend dynamic columns generated from ColumnObject.
// Block consists of two parts, dynamic part of columns and static part of columns.
// static extracted
// | --------- | ----------- |
// The static ones are original tablet_schame columns
void rebuild_schema_and_block(const TabletSchemaSPtr& original, const std::vector<int>& variant_pos,
Block& flush_block, TabletSchemaSPtr& flush_schema);

// Extract json data from source with path
Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst);

Expand Down
10 changes: 6 additions & 4 deletions be/src/vec/data_types/serde/data_type_object_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,14 @@ void DataTypeObjectSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWr
const_cast<ColumnObject&>(variant).finalize();
}
result.writeKey(col_id);
std::string value_str;
if (!variant.serialize_one_row_to_string(row_num, &value_str)) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to serialize variant {}",
variant.dump_structure());
}
JsonbParser json_parser;
CHECK(variant.get_rowstore_column() != nullptr);
// use original document
const auto& data_ref = variant.get_rowstore_column()->get_data_at(row_num);
// encode as jsonb
bool succ = json_parser.parse(data_ref.data, data_ref.size);
bool succ = json_parser.parse(value_str.data(), value_str.size());
// maybe more graceful, it is ok to check here since data could be parsed
CHECK(succ);
result.writeStartBinary();
Expand Down
4 changes: 2 additions & 2 deletions regression-test/data/variant_p0/delete_update.out
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

-- !sql --
2 {"updated_value":123} {"updated_value":123}
6 {"a":4,"b":[4],"c":4.0} {"updated_value" : 123}
6 {"a":4,"b":[4],"c":4.1} {"updated_value" : 123}
7 {"updated_value":1111} yyy

-- !sql --
2 {"updated_value":123} {"updated_value":123}
6 {"a":4,"b":[4],"c":4.0} {"updated_value" : 123}
6 {"a":4,"b":[4],"c":4.1} {"updated_value" : 123}

-- !sql --
1 "ddddddddddd" 1111 199 10 {"new_data1":1}
Expand Down
8 changes: 4 additions & 4 deletions regression-test/suites/variant_p0/delete_update.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ suite("regression_test_variant_delete_and_update", "variant_type"){
)
UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 4
properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "true");
properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "store_row_column" = "true");
"""
sql "insert into var_delete_update_mow select k, cast(v as string), cast(v as string) from var_delete_update"
sql "delete from ${table_name} where k = 1"
sql "delete from ${table_name} where k in (select k from var_delete_update_mow where k in (3, 4, 5))"

sql """insert into var_delete_update_mow values (6, '{"a":4,"b":[4],"c":4.0}', 'xxx')"""
sql """insert into var_delete_update_mow values (7, '{"a":4,"b":[4],"c":4.0}', 'yyy')"""
sql """insert into var_delete_update_mow values (6, '{"a":4,"b":[4],"c":4.1}', 'xxx')"""
sql """insert into var_delete_update_mow values (7, '{"a":4,"b":[4],"c":4.1}', 'yyy')"""
sql """update var_delete_update_mow set vs = '{"updated_value" : 123}' where k = 6"""
sql """update var_delete_update_mow set v = '{"updated_value":1111}' where k = 7"""
qt_sql "select * from var_delete_update_mow order by k"
Expand Down Expand Up @@ -108,7 +108,7 @@ suite("regression_test_variant_delete_and_update", "variant_type"){
`dft` int(11) DEFAULT "4321",
`var` variant NULL)
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" = "true")
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" = "true", "store_row_column" = "true")
"""

sql """insert into ${tableName} values
Expand Down

0 comments on commit 1f06617

Please sign in to comment.