Skip to content

Commit

Permalink
Add insert row limit and change chunk size to 64K again (#1526)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

Issue link: #1497

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Jul 25, 2024
1 parent 0e6520f commit 6ddfa5c
Show file tree
Hide file tree
Showing 17 changed files with 57 additions and 58 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ jobs:
if: ${{ !cancelled() }}
run: cat unittest_debug.log 2>/dev/null || true

- name: Download resources
run: rm -rf resource && git clone --depth=1 https://github.com/infiniflow/resource.git

- name: Install pysdk
if: ${{ !cancelled() && !failure() }}
run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pip3 uninstall -y infinity-sdk && pip3 install . -v --config-settings=cmake.build-type='Debug' --config-settings=build-dir='cmake-build-debug'"
Expand Down Expand Up @@ -156,9 +153,6 @@ jobs:
if: ${{ !cancelled() }}
run: cat unittest_release.log 2>/dev/null || true

- name: Download resources
run: rm -rf resource && git clone --depth=1 https://github.com/infiniflow/resource.git

- name: Install pysdk for Python 3.10
if: ${{ !cancelled() && !failure() }}
run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pip3 uninstall -y infinity-sdk && pip3 install . -v --config-settings=cmake.build-type='RelWithDebInfo' --config-settings=build-dir='cmake-build-release'"
Expand Down
2 changes: 1 addition & 1 deletion python/test/cases/test_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_insert_with_exceeding_invalid_value_range(self, values):
self.test_infinity_obj._test_insert_with_exceeding_invalid_value_range(values)

# batch insert, within limit
@pytest.mark.parametrize("batch", [10, 1024, 2048])
@pytest.mark.parametrize("batch", [10, 1024, 2048, 8192])
def test_batch_insert_within_limit(self, batch):
self.test_infinity_obj._test_batch_insert_within_limit(batch)

Expand Down
13 changes: 6 additions & 7 deletions python/test/internal/test_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def _test_insert_tensor_array(self):
{'c1': ([[[1, 2], [3, 4]], [[5, 6]]], [[[7, 8]], [[9, 10], [11, 12]]], [[[13, 14], [15, 16], [17, 18]]])}))
res = db_obj.drop_table("test_insert_tensor_array", ConflictType.Error)
assert res.error_code == ErrorCode.OK

def _test_insert_sparse(self):
"""
target: test insert sparse column
Expand Down Expand Up @@ -351,12 +351,12 @@ def _test_insert_exceed_block_size(self):
assert table_obj
values = [{"c1": 1} for _ in range(8193)]

with pytest.raises(Exception,
match=".*Insert values row count 8193 is larger than default block capacity 8192*"):
with pytest.raises(InfinityException) as exception:
table_obj.insert(values)
assert exception.type == InfinityException
assert exception.value.args[0] == "Insert batch row limit shouldn\'t more than 8193"

res = db_obj.drop_table(
"test_insert_exceed_block_size", ConflictType.Error)
res = db_obj.drop_table("test_insert_exceed_block_size", ConflictType.Error)
assert res.error_code == ErrorCode.OK


Expand Down Expand Up @@ -558,7 +558,7 @@ def _test_batch_insert_within_limit(self, batch):
"test_batch_insert_within_limit", ConflictType.Error)
assert res.error_code == ErrorCode.OK

# batch insert, batch size limit? 8192?
# batch insert, batch size limit 8192
def _test_batch_insert(self):
# connect
db_obj = self.infinity_obj.get_database("default_db")
Expand All @@ -575,7 +575,6 @@ def _test_batch_insert(self):
res = db_obj.drop_table("test_batch_insert", ConflictType.Error)
assert res.error_code == ErrorCode.OK


# batch insert, with invalid data type inside.
def _test_insert_with_invalid_data_type(self, batch, types):
# connect
Expand Down
22 changes: 12 additions & 10 deletions python/test_http_api/httpapibase.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,31 @@ def set_up_data(self, param=[], tp={}):
# Post operation

def tear_down(self, resp, expect={}):
print("status_code:" + str(resp.status_code))
# print("status_code:" + str(resp.status_code))
if expect.get("status_code", None) is not None:
print(f"expect: {expect['status_code']}, actual: {resp.status_code}")
if resp.status_code != expect['status_code']:
logging.error(f"expect: {expect['status_code']}, actual: {resp.status_code}")
assert resp.status_code == expect['status_code']
else:
print(f"actual: {resp.status_code}, expect: {expected_status_code}")
assert resp.status_code == expected_status_code
if resp.status_code != expected_status_code:
logging.error(f"actual: {resp.status_code}, expect: {expected_status_code}")

resp_json = resp.json()
# print(resp_json)
for item in expect.items():
if resp_json.get(item[0], None) is None:
continue
print("[" + str(item[0]) + "]: " + "resp:" + str(resp_json[item[0]]) + ", expect:" + str(item[1]))
assert str(resp_json[item[0]]) == str(item[1])
print("----------------------------------------------")
if str(resp_json[item[0]]) != str(item[1]):
logging.error("[" + str(item[0]) + "]: " + "resp:" + str(resp_json[item[0]]) + ", expect:" + str(item[1]))
assert False
# print("----------------------------------------------")
return

def request(self, url, method, header={}, data={}):
if header is None:
header = {}
url = default_url + url
print("url: " + url)
# print("url: " + url)
match method:
case "get":
response = requests.get(url, headers=header, json=data)
Expand Down Expand Up @@ -160,7 +162,7 @@ def create_table(self, db_name, table_name, fields=[], properties=[], expect={
d = self.set_up_data(['create_option'],
{'fields': fields, 'properties': properties, 'create_option': baseCreateOptions[opt]})
r = self.request(url, "post", h, d)
print(r)
# print(r)
self.tear_down(r, expect)
return

Expand Down Expand Up @@ -237,7 +239,7 @@ def drop_index(self, db_name, table_name, index_name, expect={
if exists is not None:
copt = baseDropOptions[opt]

print("copt:"+copt)
# print("copt:"+copt)

url = f"databases/{db_name}/tables/{table_name}/indexes/{index_name}"

Expand Down
2 changes: 1 addition & 1 deletion python/test_http_api/httputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def wrapped_func(*args, **kwargs):
try:
func(*args, **kwargs)
except:
print("Expected exception in " + func.__name__)
# print("Expected exception in " + func.__name__)
traceback.print_exc()

return wrapped_func
Expand Down
15 changes: 8 additions & 7 deletions python/test_http_api/test_database.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import sys
import pytest
Expand All @@ -13,7 +14,7 @@ def test_http_verison(self):
pass

def test_http_database(self):

logging.error("test_http_databasetest_http_databasetest_http_databasetest_http_databasetest_http_databasetest_http_databasetest_http_databasetest_http_databasetest_http_database")
db_name = "test_http_my_database"

self.drop_database(db_name)
Expand Down Expand Up @@ -78,7 +79,7 @@ def test_http_create_drop_show_1K_databases(self):
# create databases
db_count = 100
for i in range(db_count):
print('create test_http_db_name' + str(i))
# print('create test_http_db_name' + str(i))
self.drop_database('test_http_db_name' + str(i))
for i in range(db_count):
self.create_database('test_http_db_name' + str(i))
Expand All @@ -88,14 +89,14 @@ def test_http_create_drop_show_1K_databases(self):
# list all databases
http_database_count = 0
for db_name in dbs:
print("db name:" + db_name)
# print("db name:" + db_name)
if db_name.startswith("test_http") or db_name == "default_db":
http_database_count += 1
assert http_database_count == (db_count + 1)

# drop databases
for i in range(db_count):
print('drop test_http_db_name' + str(i))
# print('drop test_http_db_name' + str(i))
self.drop_database('test_http_db_name' + str(i))
self.clear_http_database()

Expand All @@ -109,7 +110,7 @@ def test_http_create_drop_show_100K_databases(self):
# create databases
db_count = 100000
for i in range(db_count):
print('create test_http_db_name' + str(i))
# print('create test_http_db_name' + str(i))
self.drop_database('test_http_db_name' + str(i))
for i in range(db_count):
self.create_database('test_http_db_name' + str(i))
Expand All @@ -119,14 +120,14 @@ def test_http_create_drop_show_100K_databases(self):
# list all databases
http_database_count = 0
for db_name in dbs:
print("db name:" + db_name)
# print("db name:" + db_name)
if db_name.startswith("test_http") or db_name == "default_db":
http_database_count += 1
assert http_database_count == (db_count + 1)

# drop databases
for i in range(db_count):
print('drop test_http_db_name' + str(i))
# print('drop test_http_db_name' + str(i))
self.drop_database('test_http_db_name' + str(i))


Expand Down
8 changes: 4 additions & 4 deletions python/test_http_api/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_http_import(self):
])

test_csv_dir = self.get_project_path() + TEST_DATA_DIR + file_name
print("test_csv_dir: " + test_csv_dir)
# print("test_csv_dir: " + test_csv_dir)
assert os.path.exists(test_csv_dir)

self.import_data(db_name, table_name, {
Expand Down Expand Up @@ -115,7 +115,7 @@ def test_http_import_different_file_format_data(self):
"header": False,
"delimiter": ","
})
print(file_path)
# print(file_path)
else:
file_path = self.get_project_path() + TEST_DATA_DIR + format + "/embedding_int_dim3." + format
assert os.path.exists(file_path)
Expand Down Expand Up @@ -354,7 +354,7 @@ def test_http_import_csv_with_headers(self):
])

test_csv_dir = self.get_project_path() + TEST_DATA_DIR + file_name
print("test_csv_dir: " + test_csv_dir)
# print("test_csv_dir: " + test_csv_dir)
assert os.path.exists(test_csv_dir)

self.import_data(db_name, table_name, {
Expand Down Expand Up @@ -392,7 +392,7 @@ def test_http_import_fvecs_table_with_more_columns(self):
}
])
test_csv_dir = TEST_TMP_DIR + file_name
print("test_file_dir: " + test_csv_dir)
# print("test_file_dir: " + test_csv_dir)
assert os.path.exists(test_csv_dir)

self.import_data(db_name, table_name, {
Expand Down
4 changes: 2 additions & 2 deletions python/test_http_api/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ def test_http_insert_data_fulltext_index_search(self):
{"name": "body", "type": "varchar"}
])
test_csv_dir = TEST_TMP_DIR + "enwiki_99.csv"
print("test_csv_dir: " + test_csv_dir)
# print("test_csv_dir: " + test_csv_dir)
assert os.path.exists(test_csv_dir)

self.create_index(db_name, table_name, idxname, ["body"], {
Expand Down Expand Up @@ -558,7 +558,7 @@ def test_http_fulltext_match_with_invalid_analyzer(self):
])
httputils.copy_data("enwiki_9.csv")
test_csv_dir = TEST_TMP_DIR + "enwiki_9.csv"
print("test_csv_dir: " + test_csv_dir)
# print("test_csv_dir: " + test_csv_dir)
assert os.path.exists(test_csv_dir)

self.import_data(db_name, table_name, {
Expand Down
6 changes: 3 additions & 3 deletions python/test_http_api/test_index_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ def write_worker(data, file_path, end_time, thread_id):
value.append({"doctitle": data["doctitle"][i],
"docdate": data["docdate"][i], "body": data["body"][i]})
self.insert(db_name, table_name, value)
print(f"thread {thread_id}: insert complete")
# print(f"thread {thread_id}: insert complete")
if operation == 1:
print(f"thread {thread_id}: begin import")
# print(f"thread {thread_id}: begin import")
self.import_data(file_path, {"delimiter": "\t"})
print(f"thread {thread_id}: import complete")
# print(f"thread {thread_id}: import complete")

def read_worker(end_time):
db_name = "default_db"
Expand Down
4 changes: 2 additions & 2 deletions python/test_http_api/test_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def test_http_insert_exceed_block_size(self):
values = [{"c1": 1} for _ in range(8193)]
self.insert(db_name, table_name, values, expect={
"status_code": 500,
"error_code": 7011
"error_code": 3032
})
self.drop_table(db_name, table_name)
return
Expand Down Expand Up @@ -378,7 +378,7 @@ def test_http_insert_with_invalid_data_type(self, batch, types):
"error_code": 3032,
})
else:
print(values)
# print(values)
self.insert(db_name, table_name, values)
self.drop_table(db_name, table_name)
return
Expand Down
10 changes: 5 additions & 5 deletions python/test_http_api/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def test_http_table(self):
def test_http_show_tables(self):
self.show_database("default_db")
tblist = self.get_all_tables("default_db")
print(tblist)
# print(tblist)
return

def test_http_create_varchar_table(self):
Expand Down Expand Up @@ -486,7 +486,7 @@ def test_http_create_1k_table(self):
)
# get all tables
res = self.get_all_tables(db_name)
print("table nums: " + str(len(res)))
# print("table nums: " + str(len(res)))
# assert len(res) == tb_count

# show table
Expand Down Expand Up @@ -522,7 +522,7 @@ def test_http_create_100K_table(self):
)
# get all tables
res = self.get_all_tables(db_name)
print("table nums: " + str(len(res)))
# print("table nums: " + str(len(res)))
# assert len(res) == tb_count

# show table
Expand Down Expand Up @@ -686,7 +686,7 @@ def test_http_table_create_valid_option(self):
'create_option': str(i)})

r = self.request(url, "post", h, d)
print(r)
# print(r)
self.tear_down(r, {
"status_code": 200,
"error_code": 0,
Expand All @@ -706,7 +706,7 @@ def test_http_table_create_invalid_option(self):
{'fields': [{"name": "c1", "type": "integer"}], 'properties': {},
'create_option': str(i)})
r = self.request(url, "post", h, d)
print(r)
# print(r)
self.tear_down(r, {
"status_code": 200,
"error_code": 0,
Expand Down
5 changes: 4 additions & 1 deletion src/common/default_values.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ export {
constexpr u64 MIN_VECTOR_CHUNK_SIZE = 4096UL;
constexpr u64 MAX_VECTOR_CHUNK_SIZE = 1024 * 1024UL;
constexpr u64 MAX_VECTOR_CHUNK_COUNT = std::numeric_limits<u64>::max();

// Each row has one chunk.
constexpr u64 DEFAULT_FIXLEN_CHUNK_SIZE = 1024 * 1024; // 1MB
constexpr u64 DEFAULT_FIXLEN_CHUNK_SIZE = 65536L; // 1MB
constexpr u64 DEFAULT_FIXLEN_TENSOR_CHUNK_SIZE = 8192UL * 128UL * 8UL;

// segment related constants
Expand Down Expand Up @@ -173,6 +174,8 @@ export {
constexpr SizeT DEFAULT_LOG_FILE_SIZE = 64 * 1024lu * 1024lu; // 64MB
constexpr std::string_view DEFAULT_LOG_FILE_SIZE_STR = "64MB"; // 64MB

constexpr SizeT INSERT_BATCH_ROW_LIMIT = 8192;

// default persistence parameter
constexpr std::string_view DEFAULT_PERSISTENCE_DIR = ""; // Empty means disabled
constexpr std::string_view DEFAULT_PERSISTENCE_OBJECT_SIZE_LIMIT_STR = "100MB"; // 100MB
Expand Down
7 changes: 0 additions & 7 deletions src/executor/operator/physical_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ bool PhysicalInsert::Execute(QueryContext *query_context, OperatorState *operato
String error_message = fmt::format("Insert values count{} isn't matched with table column count{}.", column_count, table_collection_column_count);
UnrecoverableError(error_message);
}
if (row_count > DEFAULT_BLOCK_CAPACITY) {
// Fixme: insert batch can larger than 8192, but currently we limit it.
Status status = Status::UnexpectedError(fmt::format("Insert values row count {} is larger than default block capacity {}.", row_count, DEFAULT_BLOCK_CAPACITY));
RecoverableError(status);
// UnrecoverableError(
// fmt::format("Insert values row count {} is larger than default block capacity {}.", row_count, DEFAULT_BLOCK_CAPACITY));
}

// Prepare the output block
Vector<SharedPtr<DataType>> output_types;
Expand Down
2 changes: 1 addition & 1 deletion src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Status Config::ParseTimeInfo(const String &time_info, i64 &time_seconds) {
u64 time_number = 0;
for (SizeT i = 0; i < info_size - 1; ++i) {
if (std::isdigit(time_info[i])) {
time_number += time_number * 10 + (time_info[i] - '0');
time_number = time_number * 10 + (time_info[i] - '0');
} else {
return Status::InvalidTimeInfo(time_info);
}
Expand Down
5 changes: 5 additions & 0 deletions src/planner/logical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ Status LogicalPlanner::BuildInsertValue(const InsertStatement *statement, Shared
// Create value list
Vector<Vector<SharedPtr<BaseExpression>>> value_list_array;
SizeT value_count = statement->values_->size();
if(value_count > INSERT_BATCH_ROW_LIMIT) {
Status status = Status::NotSupport("Insert batch row limit shouldn't more than 8192.");
RecoverableError(status);
}

for (SizeT idx = 0; idx < value_count; ++idx) {
const auto *parsed_expr_list = statement->values_->at(idx);

Expand Down
1 change: 1 addition & 0 deletions tools/run_http_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def http_api_test(http_api_dir: str, pytest_mark: str):
"-m",
"pytest",
"--tb=line",
"-s",
"-x",
"-m",
pytest_mark,
Expand Down
Loading

0 comments on commit 6ddfa5c

Please sign in to comment.