Skip to content

Commit

Permalink
[BugFix] Fix the bug of direct schema change (StarRocks#44854)
Browse files Browse the repository at this point in the history
Signed-off-by: trueeyu <[email protected]>
  • Loading branch information
trueeyu authored Apr 29, 2024
1 parent 0a360c9 commit 301445d
Show file tree
Hide file tree
Showing 11 changed files with 746 additions and 427 deletions.
40 changes: 18 additions & 22 deletions be/src/storage/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,8 @@ void HeapChunkMerger::_pop_heap() {
Status LinkedSchemaChange::process(TabletReader* reader, RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet,
TabletSharedPtr base_tablet, RowsetSharedPtr rowset,
TabletSchemaCSPtr base_tablet_schema) {
#ifndef BE_TEST
Status st = CurrentThread::mem_tracker()->check_mem_limit("LinkedSchemaChange");
if (!st.ok()) {
LOG(WARNING) << alter_msg_header() << "fail to execute schema change: " << st.message() << std::endl;
return st;
}
#endif
RETURN_IF_ERROR_WITH_WARN(CurrentThread::mem_tracker()->check_mem_limit("LinkedSchemaChange"),
fmt::format("{}fail to execute schema change", alter_msg_header()));

Status status =
new_rowset_writer->add_rowset_for_linked_schema_change(rowset, _chunk_changer->get_schema_mapping());
Expand Down Expand Up @@ -444,13 +439,9 @@ Status SchemaChangeDirectly::process(TabletReader* reader, RowsetWriter* new_row
return Status::InternalError(alter_msg_header() + "bg_worker_stopped");
}

#ifndef BE_TEST
st = CurrentThread::mem_tracker()->check_mem_limit("DirectSchemaChange");
if (!st.ok()) {
LOG(WARNING) << alter_msg_header() << "fail to execute schema change: " << st.message() << std::endl;
return st;
}
#endif
RETURN_IF_ERROR_WITH_WARN(CurrentThread::mem_tracker()->check_mem_limit("DirectSchemaChange"),
fmt::format("{}fail to execute schema change", alter_msg_header()));

if (st = reader->do_get_next(base_chunk.get()); !st.ok()) {
if (is_eos = st.is_end_of_file(); !is_eos) {
LOG(WARNING) << alter_msg_header()
Expand Down Expand Up @@ -644,7 +635,7 @@ Status SchemaChangeWithSorting::_internal_sorting(std::vector<ChunkPtr>& chunk_a
return Status::OK();
}

Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) {
Status SchemaChangeHandler::process_alter_tablet(const TAlterTabletReqV2& request) {
LOG(INFO) << _alter_msg_header << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
<< ", base_schema_hash=" << request.base_schema_hash << ", new_tablet_id=" << request.new_tablet_id
<< ", new_schema_hash=" << request.new_schema_hash << ", alter_version=" << request.alter_version;
Expand All @@ -662,14 +653,14 @@ Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& req
DeferOp release_lock(
[&] { StorageEngine::instance()->tablet_manager()->release_schema_change_lock(request.base_tablet_id); });

Status status = _do_process_alter_tablet_v2(request);
Status status = _do_process_alter_tablet(request);
LOG(INFO) << _alter_msg_header << "finished alter tablet process, status=" << status.to_string()
<< " duration: " << timer.elapsed_time() / 1000000
<< "ms, peak_mem_usage: " << CurrentThread::mem_tracker()->peak_consumption() << " bytes";
return status;
}

Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& request) {
Status SchemaChangeHandler::_do_process_alter_tablet(const TAlterTabletReqV2& request) {
TabletSharedPtr base_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(request.base_tablet_id);
if (base_tablet == nullptr) {
LOG(WARNING) << _alter_msg_header << "fail to find base tablet. base_tablet=" << request.base_tablet_id
Expand Down Expand Up @@ -817,14 +808,14 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
}
return Status::OK();
} else {
return _do_process_alter_tablet_v2_normal(request, sc_params, base_tablet, new_tablet);
return _do_process_alter_tablet_normal(request, sc_params, base_tablet, new_tablet);
}
}

Status SchemaChangeHandler::_do_process_alter_tablet_v2_normal(const TAlterTabletReqV2& request,
SchemaChangeParams& sc_params,
const TabletSharedPtr& base_tablet,
const TabletSharedPtr& new_tablet) {
Status SchemaChangeHandler::_do_process_alter_tablet_normal(const TAlterTabletReqV2& request,
SchemaChangeParams& sc_params,
const TabletSharedPtr& base_tablet,
const TabletSharedPtr& new_tablet) {
// begin to find deltas to convert from base tablet to new tablet so that
// obtain base tablet and new tablet's push lock and header write lock to prevent loading data
RowsetSharedPtr max_rowset;
Expand Down Expand Up @@ -921,6 +912,11 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2_normal(const TAlterTable
read_params.reader_type = ReaderType::READER_ALTER_TABLE;
read_params.skip_aggregation = false;
read_params.chunk_size = config::vector_chunk_size;
if (sc_params.sc_directly) {
// If the segments of rowset is overlapping, will should use heap merge,
// otherwise the rows is not ordered by short key.
read_params.sorted_by_keys_per_tablet = true;
}

// open tablet readers out of lock for open is heavy because of io
for (auto& tablet_reader : readers) {
Expand Down
9 changes: 4 additions & 5 deletions be/src/storage/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,18 @@ class SchemaChangeHandler {
SchemaChangeHandler() = default;
~SchemaChangeHandler() = default;

// schema change v2, it will not set alter task in base tablet
Status process_alter_tablet_v2(const TAlterTabletReqV2& request);
Status process_alter_tablet(const TAlterTabletReqV2& request);

void set_alter_msg_header(std::string msg) { _alter_msg_header = msg; }

private:
Status _get_versions_to_be_changed(const TabletSharedPtr& base_tablet,
std::vector<Version>* versions_to_be_changed);

Status _do_process_alter_tablet_v2(const TAlterTabletReqV2& request);
Status _do_process_alter_tablet(const TAlterTabletReqV2& request);

Status _do_process_alter_tablet_v2_normal(const TAlterTabletReqV2& request, SchemaChangeParams& sc_params,
const TabletSharedPtr& base_tablet, const TabletSharedPtr& new_tablet);
Status _do_process_alter_tablet_normal(const TAlterTabletReqV2& request, SchemaChangeParams& sc_params,
const TabletSharedPtr& base_tablet, const TabletSharedPtr& new_tablet);

Status _validate_alter_result(const TabletSharedPtr& new_tablet, const TAlterTabletReqV2& request);

Expand Down
2 changes: 0 additions & 2 deletions be/src/storage/segment_replicate_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class PTabletInfo;
class FileSystem;
struct DeltaWriterOptions;

using DeltaWriterOptions = starrocks::DeltaWriterOptions;

class ReplicateChannel {
public:
ReplicateChannel(const DeltaWriterOptions* opt, std::string host, int32_t port, int64_t node_id);
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/task/engine_alter_tablet_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Status EngineAlterTabletTask::execute() {
} else {
SchemaChangeHandler handler;
handler.set_alter_msg_header(alter_msg_header);
res = handler.process_alter_tablet_v2(_alter_tablet_req);
res = handler.process_alter_tablet(_alter_tablet_req);
}
if (!res.ok()) {
LOG(WARNING) << alter_msg_header << "failed to do alter task. status=" << res.to_string()
Expand Down
2 changes: 2 additions & 0 deletions be/src/testutil/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ add_library(TestUtil
function_utils.cpp
sync_point.cc
sync_point_impl.cc
schema_test_helper.cpp
tablet_test_helper.cpp
)

91 changes: 91 additions & 0 deletions be/src/testutil/schema_test_helper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "testutil/schema_test_helper.h"

namespace starrocks {
TabletSchemaPB SchemaTestHelper::gen_schema_pb_of_dup(TabletSchema::SchemaId schema_id, size_t num_cols,
size_t num_key_cols) {
TabletSchemaPB schema_pb;

schema_pb.set_keys_type(DUP_KEYS);
schema_pb.set_num_short_key_columns(num_key_cols);
schema_pb.set_id(schema_id);

for (size_t i = 0; i < num_cols; i++) {
auto c0 = schema_pb.add_column();
c0->set_unique_id(i);
c0->set_name("c0");
c0->set_type("INT");
c0->set_is_nullable(true);
c0->set_index_length(4);
if (i < num_key_cols) {
c0->set_is_key(true);
}
}

return schema_pb;
}

TColumn SchemaTestHelper::gen_key_column(const std::string& col_name, TPrimitiveType::type type) {
TColumnType col_type;
col_type.type = type;

TColumn col;
col.__set_column_name(col_name);
col.__set_column_type(col_type);
col.__set_is_key(true);

return col;
}

TColumn SchemaTestHelper::gen_value_column_for_dup_table(const std::string& col_name, TPrimitiveType::type type) {
TColumnType col_type;
col_type.type = type;

TColumn col;
col.__set_column_name(col_name);
col.__set_column_type(col_type);
col.__set_is_key(false);

return col;
}

TColumn SchemaTestHelper::gen_value_column_for_agg_table(const std::string& col_name, TPrimitiveType::type type) {
TColumnType col_type;
col_type.type = type;

TColumn col;
col.__set_column_name(col_name);
col.__set_column_type(col_type);
col.__set_is_key(false);
col.__set_aggregation_type(TAggregationType::SUM);

return col;
}

void SchemaTestHelper::add_column_pb_to_tablet_schema(TabletSchemaPB* tablet_schema_pb, const std::string& name,
const std::string& type, const std::string& agg,
uint32_t length) {
ColumnPB* column = tablet_schema_pb->add_column();
column->set_unique_id(0);
column->set_name(name);
column->set_type(type);
column->set_is_key(false);
column->set_is_nullable(false);
column->set_length(length);
column->set_aggregation(agg);
}

} // namespace starrocks
30 changes: 30 additions & 0 deletions be/src/testutil/schema_test_helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "gen_cpp/tablet_schema.pb.h"
#include "storage/tablet_schema.h"

namespace starrocks {
class SchemaTestHelper {
public:
static TabletSchemaPB gen_schema_pb_of_dup(TabletSchema::SchemaId schema_id, size_t num_cols, size_t num_key_cols);
static TColumn gen_key_column(const std::string& col_name, TPrimitiveType::type type);
static TColumn gen_value_column_for_dup_table(const std::string& col_name, TPrimitiveType::type type);
static TColumn gen_value_column_for_agg_table(const std::string& col_name, TPrimitiveType::type type);
static void add_column_pb_to_tablet_schema(TabletSchemaPB* tablet_schema_pb, const std::string& name,
const std::string& type, const std::string& agg, uint32_t length);
};
} // namespace starrocks
94 changes: 94 additions & 0 deletions be/src/testutil/tablet_test_helper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "testutil/tablet_test_helper.h"

#include "storage/chunk_helper.h"
#include "storage/rowset/rowset_factory.h"
#include "storage/tablet.h"

namespace starrocks {
TCreateTabletReq TabletTestHelper::gen_create_tablet_req(TTabletId tablet_id, TKeysType::type type,
TStorageType::type storage_type) {
TCreateTabletReq req;
req.tablet_id = tablet_id;
req.__set_version(1);
req.__set_version_hash(0);
req.tablet_schema.schema_hash = 0;
req.tablet_schema.short_key_column_count = 2;
req.tablet_schema.keys_type = type;
req.tablet_schema.storage_type = storage_type;
return req;
}

std::shared_ptr<RowsetWriter> TabletTestHelper::create_rowset_writer(const Tablet& tablet, RowsetId rowset_id,
Version version) {
RowsetWriterContext writer_context;
writer_context.rowset_id = rowset_id;
writer_context.tablet_uid = tablet.tablet_uid();
writer_context.tablet_id = tablet.tablet_id();
writer_context.tablet_schema_hash = tablet.schema_hash();
writer_context.rowset_path_prefix = tablet.schema_hash_path();
writer_context.tablet_schema = tablet.tablet_schema();
writer_context.rowset_state = VISIBLE;
writer_context.version = version;
std::unique_ptr<RowsetWriter> rowset_writer;
auto st = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
CHECK(st.ok());
return rowset_writer;
}

std::shared_ptr<TabletReader> TabletTestHelper::create_rowset_reader(const TabletSharedPtr& tablet,
const Schema& schema, Version version) {
TabletReaderParams read_params;
read_params.reader_type = ReaderType::READER_ALTER_TABLE;
read_params.skip_aggregation = false;
read_params.chunk_size = config::vector_chunk_size;
auto tablet_reader = std::make_unique<TabletReader>(tablet, version, schema);

CHECK(tablet_reader != nullptr);
CHECK(tablet_reader->prepare().ok());
CHECK(tablet_reader->open(read_params).ok());

return tablet_reader;
}

std::shared_ptr<DeltaWriter> TabletTestHelper::create_delta_writer(TTabletId tablet_id,
const std::vector<SlotDescriptor*>& slots,
MemTracker* mem_tracker) {
DeltaWriterOptions options;
options.tablet_id = tablet_id;
options.slots = &slots;
options.txn_id = 1;
options.partition_id = 1;
options.replica_state = ReplicaState::Primary;
auto writer = DeltaWriter::open(options, mem_tracker);
CHECK(writer.ok());
return std::move(writer.value());
}

std::vector<ChunkIteratorPtr> TabletTestHelper::create_segment_iterators(const Tablet& tablet, Version version,
OlapReaderStatistics* _stats) {
auto new_rowset = tablet.get_rowset_by_version(version);
CHECK(new_rowset != nullptr);

std::vector<ChunkIteratorPtr> seg_iters;
RowsetReadOptions rowset_opts;
rowset_opts.version = version.second;
rowset_opts.stats = _stats;
auto st = new_rowset->get_segment_iterators(*tablet.tablet_schema()->schema(), rowset_opts, &seg_iters);
CHECK(st.ok());
return seg_iters;
}
} // namespace starrocks
37 changes: 37 additions & 0 deletions be/src/testutil/tablet_test_helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "gen_cpp/AgentService_types.h"
#include "storage/delta_writer.h"
#include "storage/rowset/rowset_writer.h"
#include "storage/tablet_reader.h"

namespace starrocks {
class TabletTestHelper {
public:
static TCreateTabletReq gen_create_tablet_req(TTabletId tablet_id, TKeysType::type type,
TStorageType::type storage_type);
static std::shared_ptr<RowsetWriter> create_rowset_writer(const Tablet& tablet, RowsetId rowset_id,
Version version);
static std::shared_ptr<TabletReader> create_rowset_reader(const TabletSharedPtr& tablet, const Schema& schema,
Version version);
static std::shared_ptr<DeltaWriter> create_delta_writer(TTabletId tablet_id,
const std::vector<SlotDescriptor*>& slots,
MemTracker* mem_tracker);
static std::vector<ChunkIteratorPtr> create_segment_iterators(const Tablet& tablet, Version version,
OlapReaderStatistics* stats);
};
} // namespace starrocks
Loading

0 comments on commit 301445d

Please sign in to comment.