Skip to content

Commit

Permalink
add test cases of parallel_test (#2072)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change

- [x] Test cases
  • Loading branch information
zjbpaul1317 authored Oct 19, 2024
1 parent 2c3e0de commit 59e2611
Showing 1 changed file with 146 additions and 1 deletion.
147 changes: 146 additions & 1 deletion python/parallel_test/test_index_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
TEST_DATA_DIR = "/test/data/"
kInsertThreadNum = 4
kRuningTime = 10

kThreadNum = 4

class TestIndexParallel(TestSdk):

Expand Down Expand Up @@ -254,3 +254,148 @@ def read_worker(connection_pool: ConnectionPool, end_time, knn_column_name, knn_
"test_vector_index_parallel", ConflictType.Error)
assert res.error_code == ErrorCode.OK
connection_pool.release_conn(infinity_obj)

def test_index_creation_deletion_parallel(self, get_infinity_connection_pool):

def index_worker(connection_pool: ConnectionPool, table_name, column_name, index_name, end_time, thread_id):
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table(table_name)

while time.time() < end_time:
res = table_obj.create_index(index_name,
index.IndexInfo(column_name,
index.IndexType.FullText),
ConflictType.Ignore)
if res.error_code == ErrorCode.OK:
print(f"thread {thread_id}: index {index_name} created")
else:
print(f"thread {thread_id}: create_index {index_name} failed: {res.error_msg}")
time.sleep(0.5)
res = table_obj.drop_index(index_name, ConflictType.Ignore)
if res.error_code == ErrorCode.OK:
print(f"thread {thread_id}: index {index_name} deleted")
else:
print(f"thread {thread_id}: delete_index {index_name} failed: {res.error_msg}")
time.sleep(0.5)

connection_pool.release_conn(infinity_obj)

def insert_worker(connection_pool: ConnectionPool, table_name, data, end_time, thread_id):
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table(table_name)

while time.time() < end_time:
value = []
for i in range(len(data["doctitle"])):
value.append({"doctitle": data["doctitle"][i],
"docdate": data["docdate"][i], "body": data["body"][i]})
table_obj.insert(value)
print(f"thread {thread_id}: put data")
time.sleep(1)

connection_pool.release_conn(infinity_obj)

def query_worker(connection_pool: ConnectionPool, table_name, end_time, thread_id):
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table(table_name)

while time.time() < end_time:
try:
res = table_obj.output(["doctitle", "docdate", "_row_id", "_score"]).match_text(
"body^5", "harmful chemical", 3).to_pl()
print(f"thread {thread_id}: check result:\n{res}")
except Exception as e:
print(f"thread {thread_id}: check failed: {e}")
time.sleep(0.5)

connection_pool.release_conn(infinity_obj)

column_names = ["doctitle", "docdate", "body"]
file_path = os.getcwd() + TEST_DATA_DIR + "csv" + "/enwiki_99.csv"
df = pandas.read_csv(file_path,
delimiter="\t",
header=None,
names=column_names)
data = {key: list(value.values())
for key, value in df.to_dict().items()}

connection_pool = get_infinity_connection_pool
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_name = "test_index_creation_deletion_parallel"
res = db_obj.drop_table(table_name, ConflictType.Ignore)
assert res.error_code == ErrorCode.OK
table_obj = db_obj.create_table(table_name, {
"doctitle": {"type": "varchar"},
"docdate": {"type": "varchar"},
"body": {"type": "varchar"}}, ConflictType.Error)
assert table_obj is not None

threads = []
end_time = time.time() + kRuningTime

for i in range(kThreadNum):
threads.append(Thread(target=index_worker, args=[
connection_pool, table_name, "body", "body_index", end_time, i]))

for i in range(kThreadNum):
threads.append(Thread(target=insert_worker, args=[
connection_pool, table_name, data, end_time, i + kThreadNum]))

for i in range(kThreadNum):
threads.append(Thread(target=query_worker, args=[
connection_pool, table_name, end_time, i + 2*kThreadNum]))

for t in threads:
t.start()

for t in threads:
t.join()

res = db_obj.drop_table(table_name, ConflictType.Ignore)
assert res.error_code == ErrorCode.OK
connection_pool.release_conn(infinity_obj)

def test_table_creation_deletion_parallel(self, get_infinity_connection_pool):

def create_table_worker(connection_pool: ConnectionPool, table_name_prefix, end_time, thread_id):
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_counter = 0
while time.time() < end_time:
table_name = f"{table_name_prefix}_{thread_id}_{table_counter}"
table_counter += 1
res = db_obj.create_table(table_name, {
"id": {"type": "int"},
"value": {"type": "varchar"}}, ConflictType.Ignore)
if res.error_code == ErrorCode.OK:
print(f"thread {thread_id}: table {table_name} created")
else:
print(f"thread {thread_id}: create table {table_name} failed: {res.error_msg}")
time.sleep(0.5)
res = db_obj.drop_table(table_name, ConflictType.Ignore)
if res.error_code == ErrorCode.OK:
print(f"thread {thread_id}: table {table_name} deleted")
else:
print(f"thread {thread_id}: delete table {table_name} failed: {res.error_msg}")
time.sleep(0.5)
connection_pool.release_conn(infinity_obj)

connection_pool = get_infinity_connection_pool
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_name_prefix = "test_table_creation_deletion"

threads = []
end_time = time.time() + kRuningTime
for i in range(kThreadNum):
threads.append(Thread(target=create_table_worker, args=[
connection_pool, table_name_prefix, end_time, i]))
for t in threads:
t.start()
for t in threads:
t.join()
connection_pool.release_conn(infinity_obj)

0 comments on commit 59e2611

Please sign in to comment.