Skip to content

Commit

Permalink
Add failure tests for stealing subgraphs. Minor fix in pipeline valid…
Browse files Browse the repository at this point in the history
…ation. (NVIDIA#5518)

Signed-off-by: Michal Zientkiewicz <[email protected]>
  • Loading branch information
mzient authored Jun 11, 2024
1 parent 9420fb8 commit 398e27d
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 1 deletion.
4 changes: 3 additions & 1 deletion dali/python/nvidia/dali/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,9 @@ def __exit__(self, type, value, traceback):
def _require_unique_names(self):
ops_by_name = {}
for op in self._ops:
ops = ops_by_name.get(op.name, [])
ops = ops_by_name.get(op.name, None)
if ops is None:
ops = ops_by_name[op.name] = []
ops.append(op)
duplicate = {}
foreign = False
Expand Down
19 changes: 19 additions & 0 deletions dali/test/python/checkpointing/test_dali_checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from nvidia.dali.auto_aug import trivial_augment as ta
from reader.test_numpy import is_gds_supported
from nose.plugins.attrib import attr
from nose_utils import assert_raises


reader_signed_off = create_sign_off_decorator()
random_signed_off = create_sign_off_decorator()
Expand Down Expand Up @@ -1185,6 +1187,23 @@ def pipeline():
compare_pipelines(pipe2, pipe3, batch_size, 5)


def test_unsupported_dangling_subgraph():
es = fn.external_source("asdf")

@pipeline_def(batch_size=1, num_threads=1, device_id=None, enable_checkpointing=True)
def pipe(arg):
return arg + 0

p = pipe(es)

with assert_raises(
RuntimeError,
glob="The pipeline does not support checkpointing*"
"because it contains operator*outside the pipeline*",
):
p.build()


unsupported_readers = [
"experimental.readers.fits",
]
Expand Down
18 changes: 18 additions & 0 deletions dali/test/python/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2204,3 +2204,21 @@ def test_regression_without_current_pipeline2():
data = fn.external_source(source=[1, 2, 3], batch=False, cycle=True)
pipe.set_outputs(data.gpu())
pipe.build()


def test_subgraph_stealing():
p1 = Pipeline(batch_size=1, device_id=None, num_threads=1)
p2 = Pipeline(batch_size=1, device_id=None, num_threads=1)
with p1:
es1 = fn.external_source(source=[1, 2, 3], batch=False)
x = es1 + 1
p1.set_outputs(x)
with p2:
es2 = fn.external_source(source=[1, 2, 3], batch=False)
p2.set_outputs(x + es2)
p1.build()
with assert_raises(
RuntimeError,
glob="The pipeline is invalid because it contains operators with non-unique names",
):
p2.build()

0 comments on commit 398e27d

Please sign in to comment.