From 663037ac3b150e2f81918c4dd94ad7962b5c470f Mon Sep 17 00:00:00 2001 From: Georgi Rusev Date: Mon, 24 Feb 2025 18:06:47 +0200 Subject: [PATCH 1/4] init --- python/arcticdb/util/utils.py | 126 ++++++++++++++++++ .../arcticdb/version_store/test_mem_leaks.py | 86 +++++++++++- 2 files changed, 211 insertions(+), 1 deletion(-) diff --git a/python/arcticdb/util/utils.py b/python/arcticdb/util/utils.py index 8cb392dccb..bb1df3e331 100644 --- a/python/arcticdb/util/utils.py +++ b/python/arcticdb/util/utils.py @@ -553,3 +553,129 @@ def generate_random_dataframe(cls, rows: int, cols: int, indexed: bool = True, s gen.add_timestamp_index("index", "s", pd.Timestamp(0)) return gen.generate_dataframe() + + @classmethod + def generate_random_int_dataframe(seclslf, start_name_prefix: str, + num_rows:int, num_cols:int, + dtype: ArcticIntType = np.int64, min_value: int = None, max_value: int = None, + seed: int = 3432) -> pd.DataFrame: + """ + To be used to generate large number of same type columns, when generation time is + critical + """ + np.random.seed(seed=seed) + platform_int_info = np.iinfo("int_") + iinfo = np.iinfo(dtype) + if min_value is None: + min_value = max(iinfo.min, platform_int_info.min) + if max_value is None: + max_value = min(iinfo.max, platform_int_info.max) + + data = np.random.randint(min_value, max_value, size=(num_rows, num_cols), dtype= dtype) + columns = [f"{start_name_prefix}_{n}" for n in range(num_cols)] + + return pd.DataFrame(data=data, columns=columns) + + @classmethod + def generate_random_float_dataframe(cls, start_name_prefix: str, num_rows: int, num_cols: int, + dtype: ArcticFloatType = np.float64, + min_value: float = None, max_value: float = None, round_at: int = None, + seed: int = 54675) -> 'DFGenerator': + """ + To be used to generate large number of same type columns, when generation time is + critical + """ + # Higher numbers will trigger overflow in numpy uniform (-1e307 - 1e307) + # Get the minimum and maximum values for np.float32 + info = np.finfo(np.float32) + _max = info.max + _min = info.min + np.random.seed(seed) + if min_value is None: + min_value = max(-1e307, -sys.float_info.max, _min) + if max_value is None: + max_value = min(1e307, sys.float_info.max, _max) + if round_at is None: + data = np.random.uniform(min_value, max_value, size=(num_rows, num_cols)).astype(dtype) + else : + data = np.round(np.random.uniform(min_value, max_value, + size=(num_rows, num_cols)), round_at).astype(dtype) + + columns = [f"{start_name_prefix}_{n}" for n in range(num_cols)] + + return pd.DataFrame(data=data, columns=columns) + + @classmethod + def generate_random_strings_dataframe(cls, start_name_prefix: str, num_rows: int, num_cols: int, + column_sizes=None, seed: int = 4543): + """ + To be used to generate large number of same type columns, when generation time is + critical + If `column_sizes` not supplied default 10 will be used + """ + if column_sizes is None: + column_sizes = [10] * num_cols + np.random.seed(seed=seed) + data = [[random_string(column_sizes[col]) + for col in range(num_cols)] + for _ in range(num_rows)] + + columns = [f"{start_name_prefix}_{n}" for n in range(num_cols)] + return pd.DataFrame(data=data, columns=columns) + + @classmethod + def generate_wide_dataframe(cls, num_rows: int, num_cols: int, + num_string_cols: int, + start_time: pd.Timestamp = None, + freq: Union[str , timedelta , pd.Timedelta , pd.DateOffset] = 's', + seed = 23445): + """ + Generates as fast as possible specified number of columns. + Uses random arrays generation in numpy to do that + As the strings generation is slowest always be mindful to pass number between 1-1000 max + The generated dataframe will have also index starting at specified `start_time` + """ + + cols, mod = divmod(num_cols - num_string_cols, 10) # divide by number of unique frame types + + int_frame1 = cls.generate_random_int_dataframe("int8", num_rows=num_rows, num_cols=cols, + dtype=np.int8, seed=seed) + + int_frame2 = cls.generate_random_int_dataframe("int16", num_rows=num_rows, num_cols=cols, + dtype=np.int16, seed=seed) + + int_frame3 = cls.generate_random_int_dataframe("int32", num_rows=num_rows, num_cols=cols, + dtype=np.int32, seed=seed) + + int_frame4 = cls.generate_random_int_dataframe("int64", num_rows=num_rows, num_cols=cols + mod, + dtype=np.int64, seed=seed) + + uint_frame1 = cls.generate_random_int_dataframe("uint8", num_rows=num_rows, num_cols=cols, + dtype=np.uint8, seed=seed) + + uint_frame2 = cls.generate_random_int_dataframe("uint16", num_rows=num_rows, num_cols=cols, + dtype=np.uint16, seed=seed) + + uint_frame3 = cls.generate_random_int_dataframe("uint32", num_rows=num_rows, num_cols=cols, + dtype=np.uint32, seed=seed) + + uint_frame4 = cls.generate_random_int_dataframe("uint64", num_rows=num_rows, num_cols=cols, + dtype=np.uint64, seed=seed) + + float_frame1 = cls.generate_random_float_dataframe("float32", num_rows=num_rows, num_cols=cols, + dtype=np.float32, seed=seed) + + float_frame2 = cls.generate_random_float_dataframe("float64", num_rows=num_rows, num_cols=cols, + dtype=np.float64, seed=seed) + + str_frame = cls.generate_random_strings_dataframe("str", num_rows=num_rows, num_cols=num_string_cols) + + frame: pd.DataFrame = pd.concat([int_frame1, int_frame2, int_frame3, int_frame4, + uint_frame1, uint_frame2, uint_frame3, uint_frame4, + float_frame1, float_frame2, str_frame], axis=1) # Concatenate horizontally + + if start_time: + range = pd.date_range(start=start_time, periods=frame.shape[0], freq=freq, name='index') + frame.index = range + + return frame diff --git a/python/tests/stress/arcticdb/version_store/test_mem_leaks.py b/python/tests/stress/arcticdb/version_store/test_mem_leaks.py index f18c2ed68f..dd4146a6ce 100644 --- a/python/tests/stress/arcticdb/version_store/test_mem_leaks.py +++ b/python/tests/stress/arcticdb/version_store/test_mem_leaks.py @@ -22,8 +22,9 @@ import time import pandas as pd -from typing import Generator, Tuple +from typing import Generator, List, Tuple from arcticdb.util.test import get_sample_dataframe, random_string +from arcticdb.util.utils import DFGenerator from arcticdb.version_store.library import Library, ReadRequest from arcticdb.version_store.processing import QueryBuilder from arcticdb.version_store._store import NativeVersionStore @@ -721,7 +722,90 @@ def test_mem_leak_read_all_arctic_lib_memray(library_with_big_symbol_): logger.info("Test starting") st = time.time() data: pd.DataFrame = lib.read(symbol).data + lib.head del data logger.info(f"Test took : {time.time() - st}") gc.collect() + + @pytest.fixture + def prepare_head_tails_symbol(basic_store_small_segment): + """ + This fixture is part of test `test_mem_leak_head_tail_memray` + Should not be reused + """ + store: NativeVersionStore = basic_store_small_segment + total_number_columns = 1002 + symbol = "asdf12345" + num_rows_list = [279,199,350,999,1001] + snapshot_names = [] + for rows in num_rows_list: + st = time.time() + df = DFGenerator.generate_wide_dataframe(num_rows=rows, num_cols=total_number_columns, num_string_cols=25, + start_time=pd.Timestamp(0),seed=64578) + store.write(symbol,df) + snap = f"{symbol}_{rows}" + store.snapshot(snap) + snapshot_names.append(snap) + logger.info(f"Generated {rows} in {time.time() - st} sec") + all_columns = df.columns.to_list() + yield (store, symbol, num_rows_list, total_number_columns, snapshot_names, all_columns) + store.delete(symbol=symbol) + + @MEMRAY_TESTS_MARK + @pytest.mark.limit_leaks(location_limit="52 KB", filter_fn=is_relevant) + def test_mem_leak_head_tail_memray(prepare_head_tails_symbol): + """ + This test aims to test `head` and `tail` functions if they do leak memory. + The creation of initial symbol (test environment precondition) is in specialized fixture + so that memray does not detect memory there as this will slow the process many times + """ + + store: NativeVersionStore = None + (store, symbol, num_rows_list, total_number_columns, snapshot_names, all_columns) = prepare_head_tails_symbol + + start_test = time.time() + min_rows = min(num_rows_list) + max_rows = max(num_rows_list) + + np.random.seed = 959034 + # constructing a list of head and tail rows to be selected + important_values = [0, min_rows, 1, max_rows, 0, -min_rows, -1, -max_rows, 2, -2, min_rows-1, - (min_rows-1)] + num_rows_to_select = important_values + num_rows_to_select.extend(np.random.randint(low=2, high=min_rows-10, size=7)) # add 7 more random values + # number of iterations will be the list length/size + iterations = len(num_rows_to_select) + # constructing a random list of values for snapshot names for each iteration + snapshots_list: List[str] = np.random.choice(snapshot_names, iterations) + # constructing a random list of values for versions names for each iteration + versions_list: List[int] = np.random.randint(0, len(num_rows_list) - 1, iterations) + # constructing a random list of values for column selection for each iteration + number_columns_list: List[int] = np.random.randint(0, len(all_columns)-1, iterations) + + count = 0 + for rows in num_rows_to_select: + number_columns = np.random.choice(all_columns, number_columns_list[count]).tolist() + number_columns = list(set(number_columns)) # take only unique columns + snap = snapshots_list[count] + ver = int(versions_list[count]) + logger.info(f"rows {rows} / snapshot {snap}") + df1: pd.DataFrame = store.head(n=rows, as_of=snap, symbol=symbol).data + df2: pd.DataFrame = store.tail(n=rows, as_of=snap, symbol=symbol).data + df3: pd.DataFrame = store.head(n=rows, as_of=ver, symbol=symbol, columns=number_columns).data + difference = list(set(df3.columns.to_list()).difference(set(number_columns))) + assert len(difference) == 0, f"Columns not included : {difference}" + df4 = store.tail(n=rows, as_of=ver, symbol=symbol, columns=number_columns).data + difference = list(set(df4.columns.to_list()).difference(set(number_columns))) + assert len(difference) == 0, f"Columns not included : {difference}" + + logger.info(f"Iteration {count} / {iterations} completed") + count += 1 + del number_columns, df1, df2, df3, df4 + + del store, symbol, num_rows_list, snapshot_names, all_columns + del num_rows_to_select, important_values, snapshots_list, versions_list, number_columns_list + gc.collect() + time.sleep(10) # collection is not immediate + logger.info(f"Test completed in {time.time() - start_test}") + + \ No newline at end of file From d7eb18be6235acdac9ab8895b816724f09bc9bc8 Mon Sep 17 00:00:00 2001 From: Georgi Rusev Date: Tue, 25 Feb 2025 08:38:53 +0200 Subject: [PATCH 2/4] fix error --- python/tests/stress/arcticdb/version_store/test_mem_leaks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/stress/arcticdb/version_store/test_mem_leaks.py b/python/tests/stress/arcticdb/version_store/test_mem_leaks.py index dd4146a6ce..96f402ec5a 100644 --- a/python/tests/stress/arcticdb/version_store/test_mem_leaks.py +++ b/python/tests/stress/arcticdb/version_store/test_mem_leaks.py @@ -768,7 +768,7 @@ def test_mem_leak_head_tail_memray(prepare_head_tails_symbol): min_rows = min(num_rows_list) max_rows = max(num_rows_list) - np.random.seed = 959034 + np.random.seed(959034) # constructing a list of head and tail rows to be selected important_values = [0, min_rows, 1, max_rows, 0, -min_rows, -1, -max_rows, 2, -2, min_rows-1, - (min_rows-1)] num_rows_to_select = important_values From bda814f391fd843c68d0443aed0ce37fc3d14b8c Mon Sep 17 00:00:00 2001 From: Georgi Rusev Date: Tue, 25 Feb 2025 11:04:35 +0200 Subject: [PATCH 3/4] better version --- python/tests/conftest.py | 16 ++++++++ .../arcticdb/version_store/test_mem_leaks.py | 41 ++++++++++++++----- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 45cb0cae72..08ae344965 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -128,6 +128,22 @@ def lmdb_storage(tmp_path) -> Generator[LmdbStorageFixture, None, None]: def lmdb_library(lmdb_storage, lib_name) -> Library: return lmdb_storage.create_arctic().create_library(lib_name) +@pytest.fixture +def lmdb_library_any(lmdb_storage, lib_name, request) -> Library: + """ + Allows passing library creation parameters as parameters of the test or other fixture. + Example: + + + @pytest.mark.parametrize("lmdb_library_any", [ + {'library_options': LibraryOptions(rows_per_segment=100, columns_per_segment=100)} + ], indirect=True) + def test_my_test(lmdb_library_any): + ..... + """ + params = request.param if hasattr(request, 'param') else {} + yield lmdb_storage.create_arctic().create_library(name=lib_name, **params) + @pytest.fixture def lmdb_library_dynamic_schema(lmdb_storage, lib_name) -> Library: diff --git a/python/tests/stress/arcticdb/version_store/test_mem_leaks.py b/python/tests/stress/arcticdb/version_store/test_mem_leaks.py index 96f402ec5a..10bb7b9643 100644 --- a/python/tests/stress/arcticdb/version_store/test_mem_leaks.py +++ b/python/tests/stress/arcticdb/version_store/test_mem_leaks.py @@ -23,6 +23,8 @@ import pandas as pd from typing import Generator, List, Tuple +from arcticdb.encoding_version import EncodingVersion +from arcticdb.options import LibraryOptions from arcticdb.util.test import get_sample_dataframe, random_string from arcticdb.util.utils import DFGenerator from arcticdb.version_store.library import Library, ReadRequest @@ -729,31 +731,47 @@ def test_mem_leak_read_all_arctic_lib_memray(library_with_big_symbol_): gc.collect() @pytest.fixture - def prepare_head_tails_symbol(basic_store_small_segment): + def prepare_head_tails_symbol(lmdb_library_any): """ This fixture is part of test `test_mem_leak_head_tail_memray` + + It creates a symbol with several versions and snapshot for each version. + It inserts dataframes which are large given the segment size of library. + And if the dynamic schema is used each version has more columns than previous version + Should not be reused """ - store: NativeVersionStore = basic_store_small_segment + lib: Library = lmdb_library_any + opts: LibraryOptions = lib.options() total_number_columns = 1002 symbol = "asdf12345" - num_rows_list = [279,199,350,999,1001] + num_rows_list = [279,199,1,350,999,0,1001] snapshot_names = [] for rows in num_rows_list: st = time.time() df = DFGenerator.generate_wide_dataframe(num_rows=rows, num_cols=total_number_columns, num_string_cols=25, start_time=pd.Timestamp(0),seed=64578) - store.write(symbol,df) + lib.write(symbol,df) snap = f"{symbol}_{rows}" - store.snapshot(snap) + lib.snapshot(snap) snapshot_names.append(snap) logger.info(f"Generated {rows} in {time.time() - st} sec") + if opts.dynamic_schema: + # For dynamic schema we will increase number of columns each iteration + total_number_columns += 20 + logger.info(f"Total number of columns increased to {total_number_columns}") + all_columns = df.columns.to_list() - yield (store, symbol, num_rows_list, total_number_columns, snapshot_names, all_columns) - store.delete(symbol=symbol) + yield (lib, symbol, num_rows_list, snapshot_names, all_columns) + lib.delete(symbol=symbol) @MEMRAY_TESTS_MARK + @pytest.mark.parametrize("lmdb_library_any", [ + {'library_options': LibraryOptions(rows_per_segment=233, columns_per_segment=197, dynamic_schema=True, encoding_version=EncodingVersion.V2)}, + {'library_options': LibraryOptions(rows_per_segment=99, columns_per_segment=99, dynamic_schema=False, encoding_version=EncodingVersion.V1)} + ], indirect=True) @pytest.mark.limit_leaks(location_limit="52 KB", filter_fn=is_relevant) + def test_mem_leak_head_tail_memray(prepare_head_tails_symbol): """ This test aims to test `head` and `tail` functions if they do leak memory. @@ -762,7 +780,7 @@ def test_mem_leak_head_tail_memray(prepare_head_tails_symbol): """ store: NativeVersionStore = None - (store, symbol, num_rows_list, total_number_columns, snapshot_names, all_columns) = prepare_head_tails_symbol + (store, symbol, num_rows_list, snapshot_names, all_columns) = prepare_head_tails_symbol start_test = time.time() min_rows = min(num_rows_list) @@ -770,9 +788,10 @@ def test_mem_leak_head_tail_memray(prepare_head_tails_symbol): np.random.seed(959034) # constructing a list of head and tail rows to be selected - important_values = [0, min_rows, 1, max_rows, 0, -min_rows, -1, -max_rows, 2, -2, min_rows-1, - (min_rows-1)] - num_rows_to_select = important_values - num_rows_to_select.extend(np.random.randint(low=2, high=min_rows-10, size=7)) # add 7 more random values + num_rows_to_select=[] + important_values = [0, 1, 0 -1, 2, -2, max_rows, -max_rows ] + num_rows_to_select.extend(important_values) + num_rows_to_select.extend(np.random.randint(low=5, high=99, size=7)) # add 7 more random values # number of iterations will be the list length/size iterations = len(num_rows_to_select) # constructing a random list of values for snapshot names for each iteration From d1de17c65c961d2f000842dc59226eb118c121d6 Mon Sep 17 00:00:00 2001 From: Georgi Rusev Date: Tue, 25 Feb 2025 15:59:43 +0200 Subject: [PATCH 4/4] improved version adapted for memory on different os-es --- .../arcticdb/version_store/test_mem_leaks.py | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/python/tests/stress/arcticdb/version_store/test_mem_leaks.py b/python/tests/stress/arcticdb/version_store/test_mem_leaks.py index 41cffac4e2..4e815d084e 100644 --- a/python/tests/stress/arcticdb/version_store/test_mem_leaks.py +++ b/python/tests/stress/arcticdb/version_store/test_mem_leaks.py @@ -30,7 +30,7 @@ from arcticdb.version_store.library import Library, ReadRequest from arcticdb.version_store.processing import QueryBuilder from arcticdb.version_store._store import NativeVersionStore -from tests.util.mark import MACOS, SLOW_TESTS_MARK, WINDOWS, MEMRAY_SUPPORTED, MEMRAY_TESTS_MARK, SKIP_CONDA_MARK +from tests.util.mark import LINUX, MACOS, SLOW_TESTS_MARK, WINDOWS, MEMRAY_SUPPORTED, MEMRAY_TESTS_MARK, SKIP_CONDA_MARK logging.basicConfig(level=logging.INFO) @@ -593,6 +593,30 @@ def is_relevant(stack: Stack) -> bool: # do something to check if we need this to be added # as mem leak # print(f"SAMPLE >>> {frame.filename}:{frame.function}[{frame.lineno}]") + frame_info_str = f"{frame.filename}:{frame.function}:[{frame.lineno}]" + + if "folly::CPUThreadPoolExecutor::CPUTask" in frame_info_str: + logger.warning(f"Frame excluded : {frame_info_str}") + logger.warning(f'''Explanation : These are on purpose, and they come from the interaction of + multi-threading and forking. When Python forks, the task-scheduler has a linked-list + of tasks to execute, but there is a global lock held that protects the thread-local state. + We can't free the list without accessing the global thread-local storage singleton, + and that is protected by a lock which is now held be a thread that is in a different + process, so it will never be unlocked in the child. As a work-around we intentionally + leak the task-scheduler and replace it with a new one, in this method: + https://github.com/man-group/ArcticDB/blob/master/cpp/arcticdb/async/task_scheduler.cpp#L34 + + It's actually due to a bug in folly, because the lock around the thread-local + storage manager has a compile-time token that should be used to differentiate it + from other locks, but it has been constructed with type void as have other locks. + It's possible they might fix it at some point in which case we can free the memory. + Or we do have a vague intention to move away from folly for async if we + find something better + + Great that it is catching this, as it's the one case in the whole project where I know + for certain that it does leak memory (and only because there's no alternative''') + return False + pass return True @@ -772,8 +796,7 @@ def prepare_head_tails_symbol(lmdb_library_any): {'library_options': LibraryOptions(rows_per_segment=233, columns_per_segment=197, dynamic_schema=True, encoding_version=EncodingVersion.V2)}, {'library_options': LibraryOptions(rows_per_segment=99, columns_per_segment=99, dynamic_schema=False, encoding_version=EncodingVersion.V1)} ], indirect=True) - @pytest.mark.limit_leaks(location_limit="52 KB", filter_fn=is_relevant) - + @pytest.mark.limit_leaks(location_limit="52 KB" if not LINUX else "380 KB", filter_fn=is_relevant) def test_mem_leak_head_tail_memray(prepare_head_tails_symbol): """ This test aims to test `head` and `tail` functions if they do leak memory.