Skip to content

Commit

Permalink
Break circular reference issue causing a memory leak (#1115)
Browse files Browse the repository at this point in the history
* Fixes a memory leak #1114 by releasing reference to MRC Pipeline & Executor on stop.
* Numerous pylint fixes for `morpheus/pipeline/pipeline.py`
* Skip `tests/test_multi_segment.py::test_multi_segment_bad_data_type` due to nv-morpheus/MRC#360

fixes #1114

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Christopher Harris (https://github.com/cwharris)
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1115
  • Loading branch information
dagardner-nv authored Aug 30, 2023
1 parent b56703f commit 4ae80d0
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 14 deletions.
6 changes: 5 additions & 1 deletion morpheus/io/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ def __init__(self, storage_type: str = 'in_memory', file_format: str = 'parquet'
:param file_format: Specifies the file format to be used. Can be either 'parquet' or 'csv'.
"""

# Define these early so that they are defined even if we raise an exception, this ensures that we don't get an
# attribute error in the __del__ method.
self._storage_dir = None
self._storage_type = None

if (storage_type not in self.VALID_STORAGE_TYPES):
raise ValueError(f"Invalid storage_type '{storage_type}'")

Expand All @@ -60,7 +65,6 @@ def __init__(self, storage_type: str = 'in_memory', file_format: str = 'parquet'
self._fs = fsspec.filesystem('file')
self._manifest = {}
self._records = {}
self._storage_dir = None
self._storage_type = storage_type
self._total_rows = 0

Expand Down
30 changes: 18 additions & 12 deletions morpheus/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Pipeline():
Parameters
----------
c : `morpheus.config.Config`
config : `morpheus.config.Config`
Pipeline configuration instance.
"""
Expand All @@ -61,6 +61,7 @@ def __init__(self, config: Config):
self._source_count: int = None # Maximum number of iterations for progress reporting. None = Unknown/Unlimited

self._id_counter = 0
self._num_threads = config.num_threads

# Complete set of nodes across segments in this pipeline
self._stages: typing.Set[Stage] = set()
Expand All @@ -71,10 +72,6 @@ def __init__(self, config: Config):
# Dictionary containing segment information for this pipeline
self._segments: typing.Dict = defaultdict(lambda: {"nodes": set(), "ingress_ports": [], "egress_ports": []})

self._exec_options = mrc.Options()
self._exec_options.topology.user_cpuset = f"0-{config.num_threads - 1}"
self._exec_options.engine_factories.default_engine_type = mrc.core.options.EngineType.Thread

# Set the default channel size
mrc.Config.default_channel_size = config.edge_buffer_size

Expand All @@ -87,7 +84,6 @@ def __init__(self, config: Config):
self._is_started = False

self._mrc_executor: mrc.Executor = None
self._mrc_pipeline: mrc.Pipeline = None

@property
def is_built(self) -> bool:
Expand Down Expand Up @@ -233,9 +229,13 @@ def build(self):

logger.info("====Registering Pipeline====")

self._mrc_executor = mrc.Executor(self._exec_options)
exec_options = mrc.Options()
exec_options.topology.user_cpuset = f"0-{self._num_threads - 1}"
exec_options.engine_factories.default_engine_type = mrc.core.options.EngineType.Thread

self._mrc_executor = mrc.Executor(exec_options)

self._mrc_pipeline = mrc.Pipeline()
mrc_pipeline = mrc.Pipeline()

def inner_build(builder: mrc.Builder, segment_id: str):
logger.info("====Building Segment: %s====", segment_id)
Expand Down Expand Up @@ -281,17 +281,17 @@ def inner_build(builder: mrc.Builder, segment_id: str):
segment_egress_ports = segment["egress_ports"]
segment_inner_build = partial(inner_build, segment_id=segment_id)

self._mrc_pipeline.make_segment(segment_id, [port_info["port_pair"] for port_info in segment_ingress_ports],
[port_info["port_pair"] for port_info in segment_egress_ports],
segment_inner_build)
mrc_pipeline.make_segment(segment_id, [port_info["port_pair"] for port_info in segment_ingress_ports],
[port_info["port_pair"] for port_info in segment_egress_ports],
segment_inner_build)

logger.info("====Building Pipeline Complete!====")
self._is_build_complete = True

# Finally call _on_start
self._on_start()

self._mrc_executor.register_pipeline(self._mrc_pipeline)
self._mrc_executor.register_pipeline(mrc_pipeline)

self._is_built = True

Expand All @@ -318,6 +318,7 @@ def stop(self):
self._mrc_executor.stop()

logger.info("====Pipeline Stopped====")
self._on_stop()

async def join(self):
"""
Expand Down Expand Up @@ -347,6 +348,11 @@ async def join(self):
for stage in list(self._stages):
await stage.join()

self._on_stop()

def _on_stop(self):
self._mrc_executor = None

async def _build_and_start(self):

if (not self.is_built):
Expand Down
14 changes: 14 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import ctypes
import gc
import importlib
import logging
import os
Expand Down Expand Up @@ -503,6 +504,19 @@ def reset_plugins(reset_plugin_manger, reset_global_stage_registry):
yield


@pytest.fixture(scope="function")
def disable_gc():
"""
Disable automatic garbage collection and enables debug stats for garbage collection for the duration of the test.
This is useful for tests that require explicit control over when garbage collection occurs.
"""
gc.set_debug(gc.DEBUG_STATS)
gc.disable()
yield
gc.set_debug(0)
gc.enable()


def wait_for_camouflage(host="localhost", port=8000, timeout=5):

start_time = time.time()
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions tests/test_multi_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def test_linear_boundary_stages(config, filter_probs_df):
assert_results(comp_stage.get_results())


@pytest.mark.skip(reason="Skipping due to MRC issue #360")
@pytest.mark.use_cudf
def test_multi_segment_bad_data_type(config, filter_probs_df):
with pytest.raises(RuntimeError):
Expand Down
83 changes: 83 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import gc
import typing

import pytest

from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.utils.type_aliases import DataFrameType


class SourceTestStage(InMemorySourceStage):

def __init__(self,
config,
dataframes: typing.List[DataFrameType],
destructor_cb: typing.Callable[[], None],
repeat: int = 1):
super().__init__(config, dataframes, repeat)
self._destructor_cb = destructor_cb

@property
def name(self) -> str:
return "test-source"

def __del__(self):
self._destructor_cb()


class SinkTestStage(InMemorySinkStage):

def __init__(self, config, destructor_cb: typing.Callable[[], None]):
super().__init__(config)
self._destructor_cb = destructor_cb

@property
def name(self) -> str:
return "test-sink"

def __del__(self):
self._destructor_cb()


def _run_pipeline(config: Config, filter_probs_df: DataFrameType, update_state_dict: typing.Callable[[str], None]):
pipe = LinearPipeline(config)
pipe.set_source(SourceTestStage(config, [filter_probs_df], destructor_cb=lambda: update_state_dict("source")))
pipe.add_stage(SinkTestStage(config, destructor_cb=lambda: update_state_dict("sink")))
pipe.run()


@pytest.mark.use_cudf
def test_destructors_called(config: Config, filter_probs_df: DataFrameType):
"""
Test to ensure that the destructors of stages are called (issue #1114).
"""
state_dict = {"source": False, "sink": False}

def update_state_dict(key: str):
nonlocal state_dict
state_dict[key] = True

_run_pipeline(config, filter_probs_df, update_state_dict)

gc.collect()
assert state_dict["source"]
assert state_dict["sink"]

0 comments on commit 4ae80d0

Please sign in to comment.