From 64cfab8d0b86765eceae52c0a192bfbf4f976232 Mon Sep 17 00:00:00 2001 From: zjbpaul1317 Date: Sat, 19 Oct 2024 19:34:38 +0800 Subject: [PATCH] add test cases of parallel_test --- python/parallel_test/test_index_parallel.py | 147 +++++++++++++++++++- 1 file changed, 146 insertions(+), 1 deletion(-) diff --git a/python/parallel_test/test_index_parallel.py b/python/parallel_test/test_index_parallel.py index b05f02ccd2..8e06428eaa 100644 --- a/python/parallel_test/test_index_parallel.py +++ b/python/parallel_test/test_index_parallel.py @@ -15,7 +15,7 @@ TEST_DATA_DIR = "/test/data/" kInsertThreadNum = 4 kRuningTime = 10 - +kThreadNum = 4 class TestIndexParallel(TestSdk): @@ -254,3 +254,148 @@ def read_worker(connection_pool: ConnectionPool, end_time, knn_column_name, knn_ "test_vector_index_parallel", ConflictType.Error) assert res.error_code == ErrorCode.OK connection_pool.release_conn(infinity_obj) + + def test_index_creation_deletion_parallel(self, get_infinity_connection_pool): + + def index_worker(connection_pool: ConnectionPool, table_name, column_name, index_name, end_time, thread_id): + infinity_obj = connection_pool.get_conn() + db_obj = infinity_obj.get_database("default_db") + table_obj = db_obj.get_table(table_name) + + while time.time() < end_time: + res = table_obj.create_index(index_name, + index.IndexInfo(column_name, + index.IndexType.FullText), + ConflictType.Ignore) + if res.error_code == ErrorCode.OK: + print(f"thread {thread_id}: index {index_name} created") + else: + print(f"thread {thread_id}: create_index {index_name} failed: {res.error_msg}") + time.sleep(0.5) + res = table_obj.drop_index(index_name, ConflictType.Ignore) + if res.error_code == ErrorCode.OK: + print(f"thread {thread_id}: index {index_name} deleted") + else: + print(f"thread {thread_id}: delete_index {index_name} failed: {res.error_msg}") + time.sleep(0.5) + + connection_pool.release_conn(infinity_obj) + + def insert_worker(connection_pool: ConnectionPool, table_name, data, end_time, thread_id): + infinity_obj = connection_pool.get_conn() + db_obj = infinity_obj.get_database("default_db") + table_obj = db_obj.get_table(table_name) + + while time.time() < end_time: + value = [] + for i in range(len(data["doctitle"])): + value.append({"doctitle": data["doctitle"][i], + "docdate": data["docdate"][i], "body": data["body"][i]}) + table_obj.insert(value) + print(f"thread {thread_id}: put data") + time.sleep(1) + + connection_pool.release_conn(infinity_obj) + + def query_worker(connection_pool: ConnectionPool, table_name, end_time, thread_id): + infinity_obj = connection_pool.get_conn() + db_obj = infinity_obj.get_database("default_db") + table_obj = db_obj.get_table(table_name) + + while time.time() < end_time: + try: + res = table_obj.output(["doctitle", "docdate", "_row_id", "_score"]).match_text( + "body^5", "harmful chemical", 3).to_pl() + print(f"thread {thread_id}: check result:\n{res}") + except Exception as e: + print(f"thread {thread_id}: check failed: {e}") + time.sleep(0.5) + + connection_pool.release_conn(infinity_obj) + + column_names = ["doctitle", "docdate", "body"] + file_path = os.getcwd() + TEST_DATA_DIR + "csv" + "/enwiki_99.csv" + df = pandas.read_csv(file_path, + delimiter="\t", + header=None, + names=column_names) + data = {key: list(value.values()) + for key, value in df.to_dict().items()} + + connection_pool = get_infinity_connection_pool + infinity_obj = connection_pool.get_conn() + db_obj = infinity_obj.get_database("default_db") + table_name = "test_index_creation_deletion_parallel" + res = db_obj.drop_table(table_name, ConflictType.Ignore) + assert res.error_code == ErrorCode.OK + table_obj = db_obj.create_table(table_name, { + "doctitle": {"type": "varchar"}, + "docdate": {"type": "varchar"}, + "body": {"type": "varchar"}}, ConflictType.Error) + assert table_obj is not None + + threads = [] + end_time = time.time() + kRuningTime + + for i in range(kThreadNum): + threads.append(Thread(target=index_worker, args=[ + connection_pool, table_name, "body", "body_index", end_time, i])) + + for i in range(kThreadNum): + threads.append(Thread(target=insert_worker, args=[ + connection_pool, table_name, data, end_time, i + kThreadNum])) + + for i in range(kThreadNum): + threads.append(Thread(target=query_worker, args=[ + connection_pool, table_name, end_time, i + 2*kThreadNum])) + + for t in threads: + t.start() + + for t in threads: + t.join() + + res = db_obj.drop_table(table_name, ConflictType.Ignore) + assert res.error_code == ErrorCode.OK + connection_pool.release_conn(infinity_obj) + + def test_table_creation_deletion_parallel(self, get_infinity_connection_pool): + + def create_table_worker(connection_pool: ConnectionPool, table_name_prefix, end_time, thread_id): + infinity_obj = connection_pool.get_conn() + db_obj = infinity_obj.get_database("default_db") + table_counter = 0 + while time.time() < end_time: + table_name = f"{table_name_prefix}_{thread_id}_{table_counter}" + table_counter += 1 + res = db_obj.create_table(table_name, { + "id": {"type": "int"}, + "value": {"type": "varchar"}}, ConflictType.Ignore) + if res.error_code == ErrorCode.OK: + print(f"thread {thread_id}: table {table_name} created") + else: + print(f"thread {thread_id}: create table {table_name} failed: {res.error_msg}") + time.sleep(0.5) + res = db_obj.drop_table(table_name, ConflictType.Ignore) + if res.error_code == ErrorCode.OK: + print(f"thread {thread_id}: table {table_name} deleted") + else: + print(f"thread {thread_id}: delete table {table_name} failed: {res.error_msg}") + time.sleep(0.5) + connection_pool.release_conn(infinity_obj) + + connection_pool = get_infinity_connection_pool + infinity_obj = connection_pool.get_conn() + db_obj = infinity_obj.get_database("default_db") + table_name_prefix = "test_table_creation_deletion" + + threads = [] + end_time = time.time() + kRuningTime + for i in range(kThreadNum): + threads.append(Thread(target=create_table_worker, args=[ + connection_pool, table_name_prefix, end_time, i])) + for t in threads: + t.start() + for t in threads: + t.join() + connection_pool.release_conn(infinity_obj) \ No newline at end of file