Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memray tests for memory leaks for head() and tail() #2199

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions python/arcticdb/util/utils.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have some remarks on the code here but won't comment and leave it for the other review.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use this PR for those: #2185

Original file line number Diff line number Diff line change
Expand Up @@ -553,3 +553,129 @@ def generate_random_dataframe(cls, rows: int, cols: int, indexed: bool = True, s
gen.add_timestamp_index("index", "s", pd.Timestamp(0))
return gen.generate_dataframe()


@classmethod
def generate_random_int_dataframe(seclslf, start_name_prefix: str,
num_rows:int, num_cols:int,
dtype: ArcticIntType = np.int64, min_value: int = None, max_value: int = None,
seed: int = 3432) -> pd.DataFrame:
"""
To be used to generate large number of same type columns, when generation time is
critical
"""
np.random.seed(seed=seed)
platform_int_info = np.iinfo("int_")
iinfo = np.iinfo(dtype)
if min_value is None:
min_value = max(iinfo.min, platform_int_info.min)
if max_value is None:
max_value = min(iinfo.max, platform_int_info.max)

data = np.random.randint(min_value, max_value, size=(num_rows, num_cols), dtype= dtype)
columns = [f"{start_name_prefix}_{n}" for n in range(num_cols)]

return pd.DataFrame(data=data, columns=columns)

@classmethod
def generate_random_float_dataframe(cls, start_name_prefix: str, num_rows: int, num_cols: int,
dtype: ArcticFloatType = np.float64,
min_value: float = None, max_value: float = None, round_at: int = None,
seed: int = 54675) -> 'DFGenerator':
"""
To be used to generate large number of same type columns, when generation time is
critical
"""
# Higher numbers will trigger overflow in numpy uniform (-1e307 - 1e307)
# Get the minimum and maximum values for np.float32
info = np.finfo(np.float32)
_max = info.max
_min = info.min
np.random.seed(seed)
if min_value is None:
min_value = max(-1e307, -sys.float_info.max, _min)
if max_value is None:
max_value = min(1e307, sys.float_info.max, _max)
if round_at is None:
data = np.random.uniform(min_value, max_value, size=(num_rows, num_cols)).astype(dtype)
else :
data = np.round(np.random.uniform(min_value, max_value,
size=(num_rows, num_cols)), round_at).astype(dtype)

columns = [f"{start_name_prefix}_{n}" for n in range(num_cols)]

return pd.DataFrame(data=data, columns=columns)

@classmethod
def generate_random_strings_dataframe(cls, start_name_prefix: str, num_rows: int, num_cols: int,
column_sizes=None, seed: int = 4543):
"""
To be used to generate large number of same type columns, when generation time is
critical
If `column_sizes` not supplied default 10 will be used
"""
if column_sizes is None:
column_sizes = [10] * num_cols
np.random.seed(seed=seed)
data = [[random_string(column_sizes[col])
for col in range(num_cols)]
for _ in range(num_rows)]

columns = [f"{start_name_prefix}_{n}" for n in range(num_cols)]
return pd.DataFrame(data=data, columns=columns)

@classmethod
def generate_wide_dataframe(cls, num_rows: int, num_cols: int,
num_string_cols: int,
start_time: pd.Timestamp = None,
freq: Union[str , timedelta , pd.Timedelta , pd.DateOffset] = 's',
seed = 23445):
"""
Generates as fast as possible specified number of columns.
Uses random arrays generation in numpy to do that
As the strings generation is slowest always be mindful to pass number between 1-1000 max
The generated dataframe will have also index starting at specified `start_time`
"""

cols, mod = divmod(num_cols - num_string_cols, 10) # divide by number of unique frame types

int_frame1 = cls.generate_random_int_dataframe("int8", num_rows=num_rows, num_cols=cols,
dtype=np.int8, seed=seed)

int_frame2 = cls.generate_random_int_dataframe("int16", num_rows=num_rows, num_cols=cols,
dtype=np.int16, seed=seed)

int_frame3 = cls.generate_random_int_dataframe("int32", num_rows=num_rows, num_cols=cols,
dtype=np.int32, seed=seed)

int_frame4 = cls.generate_random_int_dataframe("int64", num_rows=num_rows, num_cols=cols + mod,
dtype=np.int64, seed=seed)

uint_frame1 = cls.generate_random_int_dataframe("uint8", num_rows=num_rows, num_cols=cols,
dtype=np.uint8, seed=seed)

uint_frame2 = cls.generate_random_int_dataframe("uint16", num_rows=num_rows, num_cols=cols,
dtype=np.uint16, seed=seed)

uint_frame3 = cls.generate_random_int_dataframe("uint32", num_rows=num_rows, num_cols=cols,
dtype=np.uint32, seed=seed)

uint_frame4 = cls.generate_random_int_dataframe("uint64", num_rows=num_rows, num_cols=cols,
dtype=np.uint64, seed=seed)

float_frame1 = cls.generate_random_float_dataframe("float32", num_rows=num_rows, num_cols=cols,
dtype=np.float32, seed=seed)

float_frame2 = cls.generate_random_float_dataframe("float64", num_rows=num_rows, num_cols=cols,
dtype=np.float64, seed=seed)

str_frame = cls.generate_random_strings_dataframe("str", num_rows=num_rows, num_cols=num_string_cols)

frame: pd.DataFrame = pd.concat([int_frame1, int_frame2, int_frame3, int_frame4,
uint_frame1, uint_frame2, uint_frame3, uint_frame4,
float_frame1, float_frame2, str_frame], axis=1) # Concatenate horizontally

if start_time:
range = pd.date_range(start=start_time, periods=frame.shape[0], freq=freq, name='index')
frame.index = range

return frame
16 changes: 16 additions & 0 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,22 @@ def lmdb_storage(tmp_path) -> Generator[LmdbStorageFixture, None, None]:
def lmdb_library(lmdb_storage, lib_name) -> Library:
return lmdb_storage.create_arctic().create_library(lib_name)

@pytest.fixture
def lmdb_library_any(lmdb_storage, lib_name, request) -> Library:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does any mean in the name? Shouldn't it be lmdb_library_with_options or something like that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see next comment

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a fixture named version_store_factory (the naming is not great though) which does does the same job. We should reuse that and make it return a V2 library instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, at least have a similar design and API to version_store_factory but perhaps a new fixture than returns a Library like library_factory. There's no reason for them to be meaningfully different to each other

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lmdb_library now is the only remaining. I figured out that if no params are passed then lmdb_library will continue to function like it used to,

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as this test is slow test it must run once hence version_store_factory is not opt.

"""
Allows passing library creation parameters as parameters of the test or other fixture.
Example:


@pytest.mark.parametrize("lmdb_library_any", [
{'library_options': LibraryOptions(rows_per_segment=100, columns_per_segment=100)}
], indirect=True)
def test_my_test(lmdb_library_any):
.....
"""
params = request.param if hasattr(request, 'param') else {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the example above this should't be param but library_options

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, param will contain all parameters library_options and other if given

yield lmdb_storage.create_arctic().create_library(name=lib_name, **params)


@pytest.fixture
def lmdb_library_dynamic_schema(lmdb_storage, lib_name) -> Library:
Expand Down
130 changes: 128 additions & 2 deletions python/tests/stress/arcticdb/version_store/test_mem_leaks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import time
import pandas as pd

from typing import Generator, Tuple
from typing import Generator, List, Tuple
from arcticdb.encoding_version import EncodingVersion
from arcticdb.options import LibraryOptions
from arcticdb.util.test import get_sample_dataframe, random_string
from arcticdb.util.utils import DFGenerator
from arcticdb.version_store.library import Library, ReadRequest
from arcticdb.version_store.processing import QueryBuilder
from arcticdb.version_store._store import NativeVersionStore
from tests.util.mark import MACOS, SLOW_TESTS_MARK, WINDOWS, MEMRAY_SUPPORTED, MEMRAY_TESTS_MARK, SKIP_CONDA_MARK
from tests.util.mark import LINUX, MACOS, SLOW_TESTS_MARK, WINDOWS, MEMRAY_SUPPORTED, MEMRAY_TESTS_MARK, SKIP_CONDA_MARK


logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -590,6 +593,30 @@ def is_relevant(stack: Stack) -> bool:
# do something to check if we need this to be added
# as mem leak
# print(f"SAMPLE >>> {frame.filename}:{frame.function}[{frame.lineno}]")
frame_info_str = f"{frame.filename}:{frame.function}:[{frame.lineno}]"

if "folly::CPUThreadPoolExecutor::CPUTask" in frame_info_str:
logger.warning(f"Frame excluded : {frame_info_str}")
logger.warning(f'''Explanation : These are on purpose, and they come from the interaction of
multi-threading and forking. When Python forks, the task-scheduler has a linked-list
of tasks to execute, but there is a global lock held that protects the thread-local state.
We can't free the list without accessing the global thread-local storage singleton,
and that is protected by a lock which is now held be a thread that is in a different
process, so it will never be unlocked in the child. As a work-around we intentionally
leak the task-scheduler and replace it with a new one, in this method:
https://github.com/man-group/ArcticDB/blob/master/cpp/arcticdb/async/task_scheduler.cpp#L34

It's actually due to a bug in folly, because the lock around the thread-local
storage manager has a compile-time token that should be used to differentiate it
from other locks, but it has been constructed with type void as have other locks.
It's possible they might fix it at some point in which case we can free the memory.
Or we do have a vague intention to move away from folly for async if we
find something better

Great that it is catching this, as it's the one case in the whole project where I know
for certain that it does leak memory (and only because there's no alternative''')
return False

pass
return True

Expand Down Expand Up @@ -723,7 +750,106 @@ def test_mem_leak_read_all_arctic_lib_memray(library_with_big_symbol_):
logger.info("Test starting")
st = time.time()
data: pd.DataFrame = lib.read(symbol).data
lib.head
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not calling the head function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some refacvtory cleanup mess - removed, should not have any line there. Thanks for spotting this

del data
logger.info(f"Test took : {time.time() - st}")

gc.collect()

@pytest.fixture
def prepare_head_tails_symbol(lmdb_library_any):
"""
This fixture is part of test `test_mem_leak_head_tail_memray`

It creates a symbol with several versions and snapshot for each version.
It inserts dataframes which are large given the segment size of library.
And if the dynamic schema is used each version has more columns than previous version

Should not be reused
"""
lib: Library = lmdb_library_any
opts: LibraryOptions = lib.options()
total_number_columns = 1002
symbol = "asdf12345"
num_rows_list = [279,199,1,350,999,0,1001]
snapshot_names = []
for rows in num_rows_list:
st = time.time()
df = DFGenerator.generate_wide_dataframe(num_rows=rows, num_cols=total_number_columns, num_string_cols=25,
start_time=pd.Timestamp(0),seed=64578)
lib.write(symbol,df)
snap = f"{symbol}_{rows}"
lib.snapshot(snap)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confused. Why is there a snapshot

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

head and tail has 'as_of' parameter. Later snapshots are used for that purpose ...
store.head(n=rows, as_of=snap, symbol=symbol
store.tail(n=rows, as_of=ver, symbol=symbol, columns=selected_columns)
for versions.

Thus I believer this covers maximum code paths

snapshot_names.append(snap)
logger.info(f"Generated {rows} in {time.time() - st} sec")
if opts.dynamic_schema:
# For dynamic schema we will increase number of columns each iteration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment repeats the code. More appropriate would be to comment why we are doing this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, very good suggestion!

total_number_columns += 20
logger.info(f"Total number of columns increased to {total_number_columns}")

all_columns = df.columns.to_list()
yield (lib, symbol, num_rows_list, snapshot_names, all_columns)
lib.delete(symbol=symbol)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt that this will be called at all

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why, Vasil?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is always called. That is actually one of core functionalities of fixtures. They provide a way to do something before (setup) and things after (cleanup) so that you isolate test logic only in the test method. And that is the reason that they yield. The cleanup phase is also protected of any problems that might aris during test execution - ie it will always execute


@MEMRAY_TESTS_MARK
@pytest.mark.parametrize("lmdb_library_any", [
{'library_options': LibraryOptions(rows_per_segment=233, columns_per_segment=197, dynamic_schema=True, encoding_version=EncodingVersion.V2)},
{'library_options': LibraryOptions(rows_per_segment=99, columns_per_segment=99, dynamic_schema=False, encoding_version=EncodingVersion.V1)}
], indirect=True)
@pytest.mark.limit_leaks(location_limit="52 KB" if not LINUX else "380 KB", filter_fn=is_relevant)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invert the if statement to avoid double negative

def test_mem_leak_head_tail_memray(prepare_head_tails_symbol):
"""
This test aims to test `head` and `tail` functions if they do leak memory.
The creation of initial symbol (test environment precondition) is in specialized fixture
so that memray does not detect memory there as this will slow the process many times
"""

store: NativeVersionStore = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only way to define typehints. Thus it can remain as typehints even if we do not enforce them are helpful when using IDE with intelisense

(store, symbol, num_rows_list, snapshot_names, all_columns) = prepare_head_tails_symbol

start_test = time.time()
min_rows = min(num_rows_list)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used in the code

max_rows = max(num_rows_list)

np.random.seed(959034)
# constructing a list of head and tail rows to be selected
num_rows_to_select=[]
important_values = [0, 1, 0 -1, 2, -2, max_rows, -max_rows ]
num_rows_to_select.extend(important_values)
num_rows_to_select.extend(np.random.randint(low=5, high=99, size=7)) # add 7 more random values
# number of iterations will be the list length/size
iterations = len(num_rows_to_select)
# constructing a random list of values for snapshot names for each iteration
snapshots_list: List[str] = np.random.choice(snapshot_names, iterations)
# constructing a random list of values for versions names for each iteration
versions_list: List[int] = np.random.randint(0, len(num_rows_list) - 1, iterations)
# constructing a random list of values for column selection for each iteration
number_columns_list: List[int] = np.random.randint(0, len(all_columns)-1, iterations)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column parameter takes a list of column names not a list of indexes

Copy link
Collaborator Author

@grusev grusev Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to number_columns_for_selection_list edited the comment also


count = 0
for rows in num_rows_to_select:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand what this loop is trying to do and as Vasil says below it looks like it is actually incorrect

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

    # We will execute several time all head/tail operations with specific number of columns.
    # the number of columns consist of random columns and boundary cases see definition above

and in the code above:

constructing a list of head and tail rows to be selected

    num_rows_to_select = []
    important_values = [0, 1, 0 -1, 2, -2, max_rows, -max_rows ]
    num_rows_to_select.extend(important_values)
    num_rows_to_select.extend(np.random.randint(low=5, high=99, size=7)) # add 7 more random values

number_columns = np.random.choice(all_columns, number_columns_list[count]).tolist()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the replace parameter to select a column once

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't even consider to read it as I had no idea that replace would help achieve uniqueness needed.

number_columns = list(set(number_columns)) # take only unique columns
snap = snapshots_list[count]
ver = int(versions_list[count])
logger.info(f"rows {rows} / snapshot {snap}")
df1: pd.DataFrame = store.head(n=rows, as_of=snap, symbol=symbol).data
df2: pd.DataFrame = store.tail(n=rows, as_of=snap, symbol=symbol).data
df3: pd.DataFrame = store.head(n=rows, as_of=ver, symbol=symbol, columns=number_columns).data
difference = list(set(df3.columns.to_list()).difference(set(number_columns)))
assert len(difference) == 0, f"Columns not included : {difference}"
Comment on lines +839 to +840
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this assert here. I have two arguments:

  1. It's obviously not helping and not checking any correctness as it's not doing anything and the test passes. You are passing number_columns_list which is a list of ints and all columns are named something like "int8_0" according to generate_wide_dataframe. Thus the read returns only the index column. The column list is empty and the difference of empty set and something else is always 0.
  2. Separation of concerns. We can add unit tests to check the correctness of the columns parameter (and in fact we have such tests) but this tests is a memory leak test. If we try checking this why not go all the way and check that head/tail return the correct data, etc... This leaves more questions than answers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was and it is working , but because poor choice of words leads to assumption no columns are selected, but they are selected ... not the names are clear for their meaning

As for item 2 with tests this is not true. With tests you can (and perhaps should assume) that a problem is lurking evrywhere. Thus adding checks at places to catch a problem earliest is always advised as longer tests will continue to run even with problem and what you get is PASS instead fail.

Only exclusion of this principle is performance tests - there minimum checks are added becaise of obvious reason - they cary perf penalty

df4 = store.tail(n=rows, as_of=ver, symbol=symbol, columns=number_columns).data
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like how some of the declarations have type annotations and some don't and it's completely arbitrary which ones do have annotations. We should be consistent in our code style otherwise it's hard to follow the code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok with that, but not all of us will use type hints. Thus our code is already hard to follow . Intelisense of IDEs use type hints, this it helps in many cases write and maintain the code better even without linters.

Thus a working for both approaches is the intersection of needs - add where there is value - complex objects, no need to ad where there is no value

We will not have linter anytime soon so making all is not adding value.

difference = list(set(df4.columns.to_list()).difference(set(number_columns)))
assert len(difference) == 0, f"Columns not included : {difference}"

logger.info(f"Iteration {count} / {iterations} completed")
count += 1
del number_columns, df1, df2, df3, df4

del store, symbol, num_rows_list, snapshot_names, all_columns
del num_rows_to_select, important_values, snapshots_list, versions_list, number_columns_list
gc.collect()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the del and the collect()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just making sure nothing is left from this python test. Thus remaining leaks should be searched outside.

time.sleep(10) # collection is not immediate
logger.info(f"Test completed in {time.time() - start_test}")


Loading