Skip to content

Commit

Permalink
Fix alter (#1987)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix some bugs when restart after alter table.
Add test case to recurrence the bug.

Issue link:#1967, #1989

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Test cases
  • Loading branch information
small-turtle-1 authored Oct 9, 2024
1 parent 065172d commit 3f2a0f5
Show file tree
Hide file tree
Showing 18 changed files with 213 additions and 96 deletions.
1 change: 1 addition & 0 deletions benchmark/local_infinity/sparse/sparse_benchmark_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import infinity_exception;
import sparse_util;
import compilation_config;
import bmp_util;
import local_file_handle;

using namespace infinity;

Expand Down
27 changes: 23 additions & 4 deletions python/restart_test/test_alter.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,18 @@ def part2(infinity_obj):
part1()
part2()

def test_alter_with_deltalog(self, infinity_runner: InfinityRunner):
@pytest.mark.parametrize(
"config, sleep, flush_mid",
[
("test/data/config/restart_test/test_alter/1.toml", 0, False),
("test/data/config/restart_test/test_alter/2.toml", 2, False),
("test/data/config/restart_test/test_alter/2.toml", 2, True),
],
)
def test_alter_complex(
self, infinity_runner: InfinityRunner, config: str, sleep: int, flush_mid: bool
):
table_name = "test_alter2"
config = "test/data/config/restart_test/test_alter/2.toml"

infinity_runner.clear()
uri = common_values.TEST_LOCAL_HOST
Expand All @@ -102,6 +111,10 @@ def part1(infinity_obj):

table_obj.insert([{"c1": 1, "c2": 2, "c3": "test"}])

if sleep > 0 and flush_mid:
print(f"sleep {sleep} seconds")
time.sleep(sleep) # wait for delta log flush

res = table_obj.add_columns({"c4": {"type": "varchar", "default": "tttt"}})
assert res.error_code == ErrorCode.OK

Expand All @@ -116,7 +129,9 @@ def part1(infinity_obj):
res = table_obj.add_columns({"c5": {"type": "int", "default": 0}})
assert res.error_code == ErrorCode.OK

time.sleep(2) # wait for delta log flush
if sleep > 0:
print(f"sleep {sleep} seconds")
time.sleep(sleep) # wait for delta log flush

@decorator
def part2(infinity_obj):
Expand All @@ -143,11 +158,15 @@ def part2(infinity_obj):
}
),
)
dropped_column_dirs = pathlib.Path("/var/infinity/data").rglob("1.col")
assert len(list(dropped_column_dirs)) == 0

db_obj.drop_table(table_name)

part1()
part2()

def test_alter_cleanup(self, infinity_runner: InfinityRunner):
def test_alter_cleanup_simple(self, infinity_runner: InfinityRunner):
table_name = "test_alter3"
config = "test/data/config/restart_test/test_alter/3.toml"

Expand Down
80 changes: 79 additions & 1 deletion python/test_pysdk/test_alter.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,84 @@ def test_simple_drop_columns(self):

db_obj.drop_table(table_name)

def test_insert_after_drop_columns(self):
table_name = "testing_table" + self.suffix
db_obj = self.infinity_obj.get_database("default_db")

db_obj.drop_table(table_name, infinity.common.ConflictType.Ignore)

table_obj = db_obj.create_table(
table_name,
{
"num": {"type": "integer"},
"body": {"type": "varchar"},
"vec": {"type": "vector,4,float"},
},
)

table_obj.add_columns(
{
"column_name1": {"type": "integer", "default": 0},
"column_name2": {"type": "float", "default": 0.0},
"column_name3": {"type": "varchar", "default": ""},
}
)
table_obj.drop_columns(["column_name1"])
table_obj.insert(
[
{
"num": 1,
"body": "unnecessary and harmful",
"vec": [1.0, 1.2, 0.8, 0.9],
},
{
"num": 2,
"body": "Office for Harmful Blooms",
"vec": [4.0, 4.2, 4.3, 4.5],
},
{
"num": 3,
"body": "A Bloom filter is a space-efficient probabilistic data structure, conceived by Burton Howard Bloom in 1970, that is used to test whether an element is a member of a set.",
"vec": [4.0, 4.2, 4.3, 4.2],
},
]
)

result = table_obj.output(["*"]).to_df()
print(result)
pd.testing.assert_frame_equal(
result,
pd.DataFrame(
{
"num": [1, 2, 3],
"body": [
"unnecessary and harmful",
"Office for Harmful Blooms",
"A Bloom filter is a space-efficient probabilistic data structure, conceived by Burton Howard Bloom in 1970, that is used to test whether an element is a member of a set.",
],
"vec": [
[1.0, 1.2, 0.8, 0.9],
[4.0, 4.2, 4.3, 4.5],
[4.0, 4.2, 4.3, 4.2],
],
"column_name2": [0.0, 0.0, 0.0],
"column_name3": ["", "", ""],
}
).astype(
{
"num": dtype("int32"),
"body": dtype("object"),
"vec": dtype("object"),
"column_name2": dtype("float32"),
"column_name3": dtype("object"),
}
),
)
db_obj.drop_table(table_name)
self.infinity_obj.disconnect()

print("test done")

def test_add_drop_column_with_index(self):
table_name = "test_add_drop_column_with_index" + self.suffix
db_obj = self.infinity_obj.get_database("default_db")
Expand Down Expand Up @@ -194,4 +272,4 @@ def test_add_drop_column_with_index(self):
),
)

db_obj.drop_table(table_name)
db_obj.drop_table(table_name)
40 changes: 1 addition & 39 deletions src/executor/operator/physical_alter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,7 @@ import txn;
import status;
import infinity_exception;
import value;
import bind_context;
import value_expression;
import expression_binder;
import defer_op;
import cast_function;
import bound_cast_func;
import base_expression;
import cast_expression;
import expression_evaluator;
import column_vector;
import expression_state;

namespace infinity {

Expand All @@ -61,35 +51,7 @@ bool PhysicalAddColumns::Execute(QueryContext *query_context, OperatorState *ope

Txn *txn = query_context->GetTxn();

ExpressionBinder tmp_binder(nullptr);
Vector<Value> values;
for (const auto &column_def : column_defs_) {
if (!column_def->has_default_value()) {
UnrecoverableError(fmt::format("Column {} has no default value", column_def->name()));
}
SharedPtr<ConstantExpr> default_expr = column_def->default_value();
auto expr = tmp_binder.BuildValueExpr(*default_expr, nullptr, 0, false);
auto *value_expr = static_cast<ValueExpression *>(expr.get());

const SharedPtr<DataType> &column_type = column_def->type();
if (value_expr->Type() == *column_type) {
values.push_back(value_expr->GetValue());
} else {
const SharedPtr<DataType> &column_type = column_def->type();

BoundCastFunc cast = CastFunction::GetBoundFunc(value_expr->Type(), *column_type);
SharedPtr<BaseExpression> cast_expr = MakeShared<CastExpression>(cast, expr, *column_type);
SharedPtr<ExpressionState> expr_state = ExpressionState::CreateState(cast_expr);
SharedPtr<ColumnVector> output_column_vector = ColumnVector::Make(column_type);
output_column_vector->Initialize(ColumnVectorType::kConstant, 1);
ExpressionEvaluator evaluator;
evaluator.Init(nullptr);
evaluator.Execute(cast_expr, expr_state, output_column_vector);

values.push_back(output_column_vector->GetValue(0));
}
}
auto status = txn->AddColumns(table_entry_, column_defs_, values);
auto status = txn->AddColumns(table_entry_, column_defs_);
if (!status.ok()) {
RecoverableError(status);
}
Expand Down
6 changes: 5 additions & 1 deletion src/storage/buffer/buffer_obj.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,11 @@ void BufferObj::SubObjRc() {
if (obj_rc_ == 0) {
UnrecoverableError(fmt::format("SubObjRc: obj_rc_ is 0, buffer: {}", GetFilename()));
}
obj_rc_--;
--obj_rc_;
if (obj_rc_ == 0) {
status_ = BufferStatus::kClean;
buffer_mgr_->AddToCleanList(this, false/*do_free*/);
}
}

void BufferObj::CheckState() const {
Expand Down
20 changes: 16 additions & 4 deletions src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ void Catalog::LoadFromEntryDelta(UniquePtr<CatalogDeltaEntry> delta_entry, Buffe
auto db_name = String(decodes[0]);
auto table_name = MakeShared<String>(decodes[1]);
const auto &table_entry_dir = add_table_entry_op->table_entry_dir_;
auto column_defs = add_table_entry_op->column_defs_;
const auto &column_defs = add_table_entry_op->column_defs_;
auto entry_type = add_table_entry_op->table_entry_type_;
auto row_count = add_table_entry_op->row_count_;
SegmentID unsealed_id = add_table_entry_op->unsealed_id_;
Expand Down Expand Up @@ -792,9 +792,21 @@ void Catalog::LoadFromEntryDelta(UniquePtr<CatalogDeltaEntry> delta_entry, Buffe
auto *table_entry = db_entry->GetTableReplay(table_name, txn_id, begin_ts);
auto *segment_entry = table_entry->segment_map_.at(segment_id).get();
auto *block_entry = segment_entry->GetBlockEntryByID(block_id).get();
block_entry->AddColumnReplay(
BlockColumnEntry::NewReplayBlockColumnEntry(block_entry, column_id, buffer_mgr, next_outline_idx, last_chunk_offset, commit_ts),
column_id);
if (merge_flag == MergeFlag::kDelete) {
block_entry->DropColumnReplay(column_id);
} else if (merge_flag == MergeFlag::kNew) {
block_entry->AddColumnReplay(BlockColumnEntry::NewReplayBlockColumnEntry(block_entry,
column_id,
buffer_mgr,
next_outline_idx,
last_chunk_offset,
commit_ts),
column_id);
} else if (merge_flag == MergeFlag::kUpdate) {
// do nothing
} else {
UnrecoverableError(fmt::format("Unsupported merge flag {} for column entry {}", (i8)merge_flag, column_id));
}
break;
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/block_column_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ void BlockColumnEntry::FlushColumn(TxnTimeStamp checkpoint_ts) {
BlockColumnEntry::Flush(this, 0, row_cnt);
}

void BlockColumnEntry::DropColumn() {
void BlockColumnEntry::DropColumn() {
buffer_->SubObjRc();
for (auto *outline_buffer : outline_buffers_) {
outline_buffer->SubObjRc();
Expand Down
16 changes: 15 additions & 1 deletion src/storage/meta/entry/block_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,17 @@ UniquePtr<BlockEntry> BlockEntry::Clone(SegmentEntry *segment_entry) const {

UniquePtr<BlockEntry>
BlockEntry::NewBlockEntry(const SegmentEntry *segment_entry, BlockID block_id, TxnTimeStamp checkpoint_ts, u64 column_count, Txn *txn) {
const TableEntry *table_entry = segment_entry->GetTableEntry();
auto block_entry = MakeUnique<BlockEntry>(segment_entry, block_id, checkpoint_ts);

block_entry->begin_ts_ = txn->BeginTS();
block_entry->txn_id_ = txn->TxnID();

block_entry->block_dir_ = BlockEntry::DetermineDir(*segment_entry->segment_dir(), block_id);
block_entry->columns_.reserve(column_count);
for (SizeT column_id = 0; column_id < column_count; ++column_id) {
for (SizeT column_idx = 0; column_idx < column_count; ++column_idx) {
const SharedPtr<ColumnDef> column_def = table_entry->column_defs()[column_idx];
ColumnID column_id = column_def->id();
auto column_entry = BlockColumnEntry::NewBlockColumnEntry(block_entry.get(), column_id, txn);
block_entry->columns_.emplace_back(std::move(column_entry));
}
Expand Down Expand Up @@ -594,6 +597,17 @@ void BlockEntry::AddColumnReplay(UniquePtr<BlockColumnEntry> column_entry, Colum
}
}

void BlockEntry::DropColumnReplay(ColumnID column_id) {
auto iter = std::find_if(columns_.begin(), columns_.end(), [&](const auto &column) { return column->column_id() == column_id; });
if (iter == columns_.end()) {
String error_message = fmt::format("BlockEntry::AddColumnReplay: column_id {} not found", column_id);
UnrecoverableError(error_message);
}
BlockColumnEntry *entry = iter->get();
entry->DropColumn();
columns_.erase(iter);
}

void BlockEntry::AppendBlock(const Vector<ColumnVector> &column_vectors, SizeT row_begin, SizeT read_size, BufferManager *buffer_mgr) {
if (read_size + block_row_count_ > row_capacity_) {
String error_message = "BlockEntry::AppendBlock: read_size + row_count_ > row_capacity_";
Expand Down
4 changes: 3 additions & 1 deletion src/storage/meta/entry/block_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public:

void AddColumnReplay(UniquePtr<BlockColumnEntry> column_entry, ColumnID column_id);

void DropColumnReplay(ColumnID column_id);

void AppendBlock(const Vector<ColumnVector> &column_vectors, SizeT row_begin, SizeT read_size, BufferManager *buffer_mgr);

void Cleanup(CleanupInfoTracer *info_tracer = nullptr, bool dropped = true) override;
Expand Down Expand Up @@ -146,7 +148,7 @@ public:
// Relative to the `data_dir` config item
const SharedPtr<String> &block_dir() const { return block_dir_; }

BlockColumnEntry *GetColumnBlockEntry(SizeT column_id) const { return columns_[column_id].get(); }
BlockColumnEntry *GetColumnBlockEntry(SizeT column_idx) const { return columns_[column_idx].get(); }

FastRoughFilter *GetFastRoughFilter() { return fast_rough_filter_.get(); }

Expand Down
43 changes: 41 additions & 2 deletions src/storage/meta/entry/table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ import infinity_context;
import persistence_manager;
import bg_task;
import defer_op;
import bind_context;
import value_expression;
import expression_binder;
import cast_function;
import bound_cast_func;
import base_expression;
import cast_expression;
import expression_evaluator;
import column_vector;
import expression_state;

namespace infinity {

Expand Down Expand Up @@ -256,6 +266,7 @@ void TableEntry::UpdateEntryReplay(const SharedPtr<TableEntry> &table_entry) {
txn_id_ = table_entry->txn_id_;
begin_ts_ = table_entry->begin_ts_;
commit_ts_.store(table_entry->commit_ts_);
columns_ = table_entry->columns_;
row_count_ = table_entry->row_count();
unsealed_id_ = table_entry->unsealed_id();
next_segment_id_ = table_entry->next_segment_id();
Expand Down Expand Up @@ -1222,7 +1233,7 @@ UniquePtr<TableEntry> TableEntry::Deserialize(const nlohmann::json &table_entry_
SegmentID unsealed_id = table_entry_json["unsealed_id"];
SegmentID next_segment_id = table_entry_json["next_segment_id"];

if(!table_entry_json.contains("next_column_id")) {
if (!table_entry_json.contains("next_column_id")) {
String error_message = "No 'next_column_id in table entry of catalog file, maybe your catalog is generated before 0.4.0.'";
UnrecoverableError(error_message);
}
Expand Down Expand Up @@ -1451,7 +1462,35 @@ void TableEntry::SetUnlock() {
locked_ = false;
}

void TableEntry::AddColumns(const Vector<SharedPtr<ColumnDef>> &column_defs, const Vector<Value> &default_values, TxnTableStore *txn_table_store) {
void TableEntry::AddColumns(const Vector<SharedPtr<ColumnDef>> &column_defs, TxnTableStore *txn_table_store) {
ExpressionBinder tmp_binder(nullptr);
Vector<Value> default_values;
for (const auto &column_def : column_defs) {
if (!column_def->has_default_value()) {
UnrecoverableError(fmt::format("Column {} has no default value", column_def->name()));
}
SharedPtr<ConstantExpr> default_expr = column_def->default_value();
auto expr = tmp_binder.BuildValueExpr(*default_expr, nullptr, 0, false);
auto *value_expr = static_cast<ValueExpression *>(expr.get());

const SharedPtr<DataType> &column_type = column_def->type();
if (value_expr->Type() == *column_type) {
default_values.push_back(value_expr->GetValue());
} else {
const SharedPtr<DataType> &column_type = column_def->type();

BoundCastFunc cast = CastFunction::GetBoundFunc(value_expr->Type(), *column_type);
SharedPtr<BaseExpression> cast_expr = MakeShared<CastExpression>(cast, expr, *column_type);
SharedPtr<ExpressionState> expr_state = ExpressionState::CreateState(cast_expr);
SharedPtr<ColumnVector> output_column_vector = ColumnVector::Make(column_type);
output_column_vector->Initialize(ColumnVectorType::kConstant, 1);
ExpressionEvaluator evaluator;
evaluator.Init(nullptr);
evaluator.Execute(cast_expr, expr_state, output_column_vector);

default_values.push_back(output_column_vector->GetValue(0));
}
}
Vector<Pair<ColumnID, const Value *>> columns_info;
for (SizeT idx = 0; idx < column_defs.size(); ++idx) {
const auto &column_def = column_defs[idx];
Expand Down
Loading

0 comments on commit 3f2a0f5

Please sign in to comment.