Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG]: Python source stages not shutting down if another stage raises an exception #1838

Closed
2 tasks done
dagardner-nv opened this issue Aug 9, 2024 · 0 comments · Fixed by #1881
Closed
2 tasks done
Assignees
Labels
bug Something isn't working

Comments

@dagardner-nv
Copy link
Contributor

dagardner-nv commented Aug 9, 2024

Version

24.10

Which installation method(s) does this occur on?

Source

Describe the bug.

This primarily impacts sources which run indefinitely (HttpClientSourceStage, AppShieldSourceStage, RSSSourceStage)

C++ sources avoid this issue as they have access to the subscriber object and a well behaved source stage will routinely check the subscriber.is_subscribed() method at the beginning of each poll iteration.

Python sources don't have access to the subscriber, and while the pymrc C++ wrapper (python/mrc/_pymrc/src/segment.cpp) can check this there are two problems:

  1. It doesn't stop iterating when it returns a false (easily fixed).
  2. It blocks on each iteration of for (auto next_val : iter_wrapper), so unless the source yields, the C++ code is blocked.

Ideal solution:
The MRC executor should accept an optional on_stage_change callback method
pymrc should wrap this allowing for a Python impl for the callback
Morpheus' pipeline.py should supply a callback which calls Pipeline.stop() when the status changes to a terminal state.

Minimum reproducible example

   pipeline.set_source(
                HttpClientSourceStage(config, url="http://localhost:8080/api/v1/data", sleep_time=sleep_time))

    @stage
    def print_msg(msg: typing.Any, *, raise_error: bool = False) -> typing.Any:
        if isinstance(msg, MessageMeta):
            print(f"Received:\n{msg.df}")
            if raise_error:
                print("Raising error")
                raise ValueError("Error")
        else:
            print(f"Received:\n{msg}")

        return msg

    pipeline.add_stage(print_msg(config, raise_error=True))

Relevant log output

Click here to see error details

[Paste the error here, it will be hidden by default]

Full env printout

Click here to see environment details

[Paste the results of print_env.sh here, it will be hidden by default]

Other/Misc.

No response

Code of Conduct

  • I agree to follow Morpheus' Code of Conduct
  • I have searched the open bugs and have found no duplicates for this bug report
@dagardner-nv dagardner-nv added the bug Something isn't working label Aug 9, 2024
@dagardner-nv dagardner-nv self-assigned this Aug 9, 2024
dagardner-nv added a commit to dagardner-nv/Morpheus that referenced this issue Aug 16, 2024
rapids-bot bot pushed a commit to nv-morpheus/MRC that referenced this issue Aug 29, 2024
* When a Python generator source yields a value, and the subscriber is no longer subscribed, stop the source.
* Fix out of date docstring comment.

This is a partial fix for nv-morpheus/Morpheus#1838

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Anuradha Karuppiah (https://github.com/AnuradhaKaruppiah)

URL: #493
dagardner-nv added a commit to dagardner-nv/Morpheus that referenced this issue Aug 30, 2024
@mdemoret-nv mdemoret-nv added this to the 24.10 - Release milestone Sep 11, 2024
@rapids-bot rapids-bot bot closed this as completed in aa2c5df Sep 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment