Skip to content

Commit

Permalink
[FEAT MERGE] transfer without kill tx
Browse files Browse the repository at this point in the history
Co-authored-by: Minionyh <[email protected]>
Co-authored-by: KyrielightWei <[email protected]>
  • Loading branch information
3 people authored and ob-robot committed Dec 8, 2023
1 parent 17d06c0 commit ff0ec7e
Show file tree
Hide file tree
Showing 125 changed files with 10,818 additions and 1,118 deletions.
5 changes: 1 addition & 4 deletions deps/oblib/src/common/ob_common_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,9 @@ struct ObQueryFlag
inline void set_use_fast_agg() { use_fast_agg_ = UseFastAgg; }
inline void set_iter_uncommitted_row() { iter_uncommitted_row_ = true; }
inline void set_not_iter_uncommitted_row() { iter_uncommitted_row_ = false; }
inline void set_for_foreign_key_check() { for_foreign_key_check_ = true; }
inline void set_ignore_trans_stat() { ignore_trans_stat_ = true; }
inline void set_not_ignore_trans_stat() { ignore_trans_stat_ = false; }
inline bool iter_uncommitted_row() const { return iter_uncommitted_row_; }
inline void set_for_foreign_key_check() { for_foreign_key_check_ = true; }
inline bool is_for_foreign_key_check() const { return for_foreign_key_check_; }
inline bool is_ignore_trans_stat() const { return ignore_trans_stat_; }
inline bool is_sstable_cut() const { return is_sstable_cut_; }
inline bool is_skip_read_lob() const { return skip_read_lob_; }
inline void disable_cache()
Expand Down
2 changes: 2 additions & 0 deletions mittest/mtlenv/mock_tenant_module_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ int MockTenantModuleEnv::init()
if (inited_) {
ret = OB_INIT_TWICE;
STORAGE_LOG(ERROR, "init twice", K(ret));
} else if (OB_FAIL(ObClockGenerator::init())) {
STORAGE_LOG(ERROR, "init ClockGenerator failed", K(ret));
} else if (FALSE_IT(init_gctx_gconf())) {
} else if (OB_FAIL(init_before_start_mtl())) {
STORAGE_LOG(ERROR, "init_before_start_mtl failed", K(ret));
Expand Down
74 changes: 74 additions & 0 deletions mittest/mtlenv/storage/test_trans.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#define USING_LOG_PREFIX STORAGE
#include <gtest/gtest.h>
#include <thread>
#include "mtlenv/mock_tenant_module_env.h"
#include "storage/mockcontainer/mock_ob_iterator.h"
#include "storage/mockcontainer/mock_ob_end_trans_callback.h"
Expand Down Expand Up @@ -365,6 +366,79 @@ TEST_F(TestTrans, freeze)
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze());
}
*/
TEST_F(TestTrans, transfer_block)
{
int ret = OB_SUCCESS;
uint64_t tenant_id = MTL_ID();
ObLSID ls_id(100);
ObTabletID tablet_id(1001);

LOG_INFO("start transaction");
ObTxDesc *tx_desc = NULL;
ObTxReadSnapshot snapshot;
prepare_tx_desc(tx_desc, snapshot);
// prepare insert param
const char *ins_str =
"bigint dml \n"
"300 T_DML_INSERT \n";
insert_rows(ls_id, tablet_id, *tx_desc, snapshot, ins_str);

ObTransService *tx_service = MTL(ObTransService*);
ObPartTransCtx *part_ctx;
ASSERT_EQ(OB_SUCCESS, tx_service->tx_ctx_mgr_.get_tx_ctx(ls_id, tx_desc->tx_id_, false, part_ctx));
part_ctx->sub_state_.set_transfer_blocking();
ASSERT_EQ(OB_SUCCESS, tx_service->tx_ctx_mgr_.revert_tx_ctx(part_ctx));

std::thread th([part_ctx] () {
::sleep(3);
part_ctx->sub_state_.clear_transfer_blocking();
});

LOG_INFO("commit transaction");
ASSERT_EQ(OB_SUCCESS, tx_service->commit_tx(*tx_desc, ObTimeUtility::current_time() + 100000000));

LOG_INFO("release transaction");
tx_service->release_tx(*tx_desc);

th.join();
}

TEST_F(TestTrans, transfer_block2)
{
int ret = OB_SUCCESS;
uint64_t tenant_id = MTL_ID();
ObLSID ls_id(100);
ObTabletID tablet_id(1001);

LOG_INFO("start transaction");
ObTxDesc *tx_desc = NULL;
ObTxReadSnapshot snapshot;
prepare_tx_desc(tx_desc, snapshot);
// prepare insert param
const char *ins_str =
"bigint dml \n"
"400 T_DML_INSERT \n";
insert_rows(ls_id, tablet_id, *tx_desc, snapshot, ins_str);

ObTransService *tx_service = MTL(ObTransService*);
ObPartTransCtx *part_ctx;
ASSERT_EQ(OB_SUCCESS, tx_service->tx_ctx_mgr_.get_tx_ctx(ls_id, tx_desc->tx_id_, false, part_ctx));
bool is_blocked = false;
part_ctx->sub_state_.set_transfer_blocking();
ASSERT_EQ(OB_SUCCESS, tx_service->tx_ctx_mgr_.revert_tx_ctx(part_ctx));

std::thread th([part_ctx] () {
::sleep(3);
part_ctx->sub_state_.clear_transfer_blocking();
});

LOG_INFO("rollback transaction");
ASSERT_EQ(OB_SUCCESS, tx_service->rollback_tx(*tx_desc));

LOG_INFO("release transaction");
tx_service->release_tx(*tx_desc);
th.join();
}

TEST_F(TestTrans, remove_ls)
{
Expand Down
4 changes: 1 addition & 3 deletions mittest/mtlenv/test_tx_data_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -750,9 +750,7 @@ int main(int argc, char **argv)
// TEST_LOG("GCONF.syslog_io_bandwidth_limit %ld ", GCONF.syslog_io_bandwidth_limit.get_value());
// LOG_INFO("GCONF.syslog_io_bandwidth_limit ", K(GCONF.syslog_io_bandwidth_limit.get_value()));

if (OB_SUCCESS != ObClockGenerator::init()) {
TRANS_LOG(WARN, "ObClockGenerator::init error!");
} else {
{
if (argc > 1) {
const_data_num = atoi(argv[1]);
} else {
Expand Down
4 changes: 2 additions & 2 deletions mittest/multi_replica/test_ob_dup_table_restart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ TEST_F(GET_RESTART_ZONE_TEST_CLASS_NAME(2, 1), become_leader_after_restart)
transaction::ObPartTransCtx *tx_ctx = nullptr;
ASSERT_EQ(OB_SUCCESS,
ls_handle.get_ls()->get_tx_ctx(transaction::ObTransID(update_tx_id), false, tx_ctx));
share::ObLSArray fake_parts;
ASSERT_EQ(OB_SUCCESS, fake_parts.push_back(share::ObLSID(static_basic_arg_.ls_id_num_)));
ObTxCommitParts fake_parts;
ASSERT_EQ(OB_SUCCESS, fake_parts.push_back(ObTxExecPart(share::ObLSID(static_basic_arg_.ls_id_num_), -1, -1)));
tx_ctx->set_2pc_participants_(fake_parts);
tx_ctx->submit_redo_commit_info_log_();
RETRY_UNTIL_TIMEOUT(tx_ctx->busy_cbs_.is_empty(), 20 * 1000 * 1000, 100 * 1000);
Expand Down
45 changes: 25 additions & 20 deletions mittest/simple_server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ set(OBSERVER_TEST_SRCS
env/ob_simple_server.cpp
env/ob_simple_server_restart_helper.cpp
env/ob_simple_cluster_test_base.cpp
env/ob_simple_server_helper.cpp
)

add_library(observer_test ${OBSERVER_TEST_SRCS})
Expand Down Expand Up @@ -29,27 +30,31 @@ function(errsim_ha_unittest_observer case)
target_link_libraries(${case} PRIVATE gtest gmock observer_test oceanbase)
endfunction()

add_executable(test_simple_ob
EXCLUDE_FROM_ALL
test_ob_simple_cluster.cpp
env/ob_simple_server.cpp
env/ob_simple_server_restart_helper.cpp
env/ob_simple_cluster_test_base.cpp
)
target_include_directories(test_simple_ob PUBLIC
${CMAKE_SOURCE_DIR}/unittest ${CMAKE_SOURCE_DIR}/mittest)
target_link_libraries(test_simple_ob
PRIVATE
-Wl,--start-group
oceanbase_static
ob_sql_static
ob_storage_static
-Wl,--end-group
-static-libgcc
-static-libstdc++
gtest
gmock)
function(ob_offline_observer case case_file)
add_executable(${case}
EXCLUDE_FROM_ALL
${case_file}
${OBSERVER_TEST_SRCS}
)
target_include_directories(${case} PUBLIC
${CMAKE_SOURCE_DIR}/unittest ${CMAKE_SOURCE_DIR}/mittest)
target_link_libraries(${case}
PRIVATE
-Wl,--start-group
oceanbase_static
ob_sql_static
ob_storage_static
-Wl,--end-group
-static-libgcc
-static-libstdc++
gtest
gmock)
endfunction()

ob_offline_observer(test_simple_ob test_ob_simple_cluster.cpp)
ob_offline_observer(test_transfer_tx test_transfer_tx.cpp)

ob_unittest_observer(test_transfer_no_kill_tx test_transfer_tx.cpp)
ob_unittest_observer(test_standby_balance test_standby_balance_ls_group.cpp)
ob_unittest_observer(test_ls_recover test_ls_recover.cpp)
ob_unittest_observer(test_ob_simple_cluster test_ob_simple_cluster.cpp)
Expand Down
7 changes: 4 additions & 3 deletions mittest/simple_server/env/ob_simple_cluster_test_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ int ObSimpleClusterTestBase::close()
int ObSimpleClusterTestBase::create_tenant(const char *tenant_name,
const char *memory_size,
const char *log_disk_size,
const bool oracle_mode)
const bool oracle_mode,
int64_t tenant_cpu)
{
SERVER_LOG(INFO, "create tenant start");
int32_t log_level;
Expand Down Expand Up @@ -228,8 +229,8 @@ int ObSimpleClusterTestBase::create_tenant(const char *tenant_name,
{
ObSqlString sql;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(sql.assign_fmt("create resource unit %s%s max_cpu 2, memory_size '%s', log_disk_size='%s';",
UNIT_BASE, tenant_name, memory_size, log_disk_size))) {
} else if (OB_FAIL(sql.assign_fmt("create resource unit %s%s max_cpu %ld, memory_size '%s', log_disk_size='%s';",
UNIT_BASE, tenant_name, tenant_cpu, memory_size, log_disk_size))) {
SERVER_LOG(WARN, "create_tenant", K(ret));
} else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) {
SERVER_LOG(WARN, "create_tenant", K(ret));
Expand Down
3 changes: 2 additions & 1 deletion mittest/simple_server/env/ob_simple_cluster_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class ObSimpleClusterTestBase : public testing::Test
int create_tenant(const char *tenant_name = "tt1",
const char *memory_size = "2G",
const char *log_disk_size = "2G",
const bool oracle_mode = false);
const bool oracle_mode = false,
int64_t tenant_cpu = 2);
int delete_tenant(const char *tenant_name = "tt1");
int get_tenant_id(uint64_t &tenant_id, const char *tenant_name = "tt1");
int exec_write_sql_sys(const char *sql_str, int64_t &affected_rows);
Expand Down
2 changes: 1 addition & 1 deletion mittest/simple_server/env/ob_simple_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ int ObSimpleServer::init_sql_proxy2(const char *tenant_name, const char *db_name
param.long_query_timeout_ = 300*1000*1000; // 120s
param.connection_refresh_interval_ = 200*1000; // 200ms
param.connection_pool_warn_time_ = 10*1000*1000; // 1s
param.sqlclient_per_observer_conn_limit_ = 1000;
param.sqlclient_per_observer_conn_limit_ = 10000;
ret = sql_conn_pool2_.init(db_addr, param);
if (OB_SUCC(ret)) {
sql_conn_pool2_.set_mode(common::sqlclient::ObMySQLConnection::DEBUG_MODE);
Expand Down
Loading

0 comments on commit ff0ec7e

Please sign in to comment.