Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add test cases of parallel_test #2072

Merged
merged 1 commit into from
Oct 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 146 additions & 1 deletion python/parallel_test/test_index_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
TEST_DATA_DIR = "/test/data/"
kInsertThreadNum = 4
kRuningTime = 10

kThreadNum = 4

class TestIndexParallel(TestSdk):

Expand Down Expand Up @@ -254,3 +254,148 @@ def read_worker(connection_pool: ConnectionPool, end_time, knn_column_name, knn_
"test_vector_index_parallel", ConflictType.Error)
assert res.error_code == ErrorCode.OK
connection_pool.release_conn(infinity_obj)

def test_index_creation_deletion_parallel(self, get_infinity_connection_pool):

def index_worker(connection_pool: ConnectionPool, table_name, column_name, index_name, end_time, thread_id):
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table(table_name)

while time.time() < end_time:
res = table_obj.create_index(index_name,
index.IndexInfo(column_name,
index.IndexType.FullText),
ConflictType.Ignore)
if res.error_code == ErrorCode.OK:
print(f"thread {thread_id}: index {index_name} created")
else:
print(f"thread {thread_id}: create_index {index_name} failed: {res.error_msg}")
time.sleep(0.5)
res = table_obj.drop_index(index_name, ConflictType.Ignore)
if res.error_code == ErrorCode.OK:
print(f"thread {thread_id}: index {index_name} deleted")
else:
print(f"thread {thread_id}: delete_index {index_name} failed: {res.error_msg}")
time.sleep(0.5)

connection_pool.release_conn(infinity_obj)

def insert_worker(connection_pool: ConnectionPool, table_name, data, end_time, thread_id):
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table(table_name)

while time.time() < end_time:
value = []
for i in range(len(data["doctitle"])):
value.append({"doctitle": data["doctitle"][i],
"docdate": data["docdate"][i], "body": data["body"][i]})
table_obj.insert(value)
print(f"thread {thread_id}: put data")
time.sleep(1)

connection_pool.release_conn(infinity_obj)

def query_worker(connection_pool: ConnectionPool, table_name, end_time, thread_id):
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table(table_name)

while time.time() < end_time:
try:
res = table_obj.output(["doctitle", "docdate", "_row_id", "_score"]).match_text(
"body^5", "harmful chemical", 3).to_pl()
print(f"thread {thread_id}: check result:\n{res}")
except Exception as e:
print(f"thread {thread_id}: check failed: {e}")
time.sleep(0.5)

connection_pool.release_conn(infinity_obj)

column_names = ["doctitle", "docdate", "body"]
file_path = os.getcwd() + TEST_DATA_DIR + "csv" + "/enwiki_99.csv"
df = pandas.read_csv(file_path,
delimiter="\t",
header=None,
names=column_names)
data = {key: list(value.values())
for key, value in df.to_dict().items()}

connection_pool = get_infinity_connection_pool
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_name = "test_index_creation_deletion_parallel"
res = db_obj.drop_table(table_name, ConflictType.Ignore)
assert res.error_code == ErrorCode.OK
table_obj = db_obj.create_table(table_name, {
"doctitle": {"type": "varchar"},
"docdate": {"type": "varchar"},
"body": {"type": "varchar"}}, ConflictType.Error)
assert table_obj is not None

threads = []
end_time = time.time() + kRuningTime

for i in range(kThreadNum):
threads.append(Thread(target=index_worker, args=[
connection_pool, table_name, "body", "body_index", end_time, i]))

for i in range(kThreadNum):
threads.append(Thread(target=insert_worker, args=[
connection_pool, table_name, data, end_time, i + kThreadNum]))

for i in range(kThreadNum):
threads.append(Thread(target=query_worker, args=[
connection_pool, table_name, end_time, i + 2*kThreadNum]))

for t in threads:
t.start()

for t in threads:
t.join()

res = db_obj.drop_table(table_name, ConflictType.Ignore)
assert res.error_code == ErrorCode.OK
connection_pool.release_conn(infinity_obj)

def test_table_creation_deletion_parallel(self, get_infinity_connection_pool):

def create_table_worker(connection_pool: ConnectionPool, table_name_prefix, end_time, thread_id):
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_counter = 0
while time.time() < end_time:
table_name = f"{table_name_prefix}_{thread_id}_{table_counter}"
table_counter += 1
res = db_obj.create_table(table_name, {
"id": {"type": "int"},
"value": {"type": "varchar"}}, ConflictType.Ignore)
if res.error_code == ErrorCode.OK:
print(f"thread {thread_id}: table {table_name} created")
else:
print(f"thread {thread_id}: create table {table_name} failed: {res.error_msg}")
time.sleep(0.5)
res = db_obj.drop_table(table_name, ConflictType.Ignore)
if res.error_code == ErrorCode.OK:
print(f"thread {thread_id}: table {table_name} deleted")
else:
print(f"thread {thread_id}: delete table {table_name} failed: {res.error_msg}")
time.sleep(0.5)
connection_pool.release_conn(infinity_obj)

connection_pool = get_infinity_connection_pool
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_name_prefix = "test_table_creation_deletion"

threads = []
end_time = time.time() + kRuningTime
for i in range(kThreadNum):
threads.append(Thread(target=create_table_worker, args=[
connection_pool, table_name_prefix, end_time, i]))
for t in threads:
t.start()
for t in threads:
t.join()
connection_pool.release_conn(infinity_obj)
Loading