From a2ea1c4a45984b763081d567f6dde89f61ed4ec0 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Sat, 16 Sep 2023 21:42:17 +0000 Subject: [PATCH 01/23] flatmap --- dependencies.yaml | 8 ++ python/mrc/_pymrc/include/pymrc/operators.hpp | 1 + python/mrc/_pymrc/src/operators.cpp | 83 +++++++++++++++++++ python/mrc/core/operators.cpp | 1 + 4 files changed, 93 insertions(+) diff --git a/dependencies.yaml b/dependencies.yaml index 966608a19..85976ee05 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -8,6 +8,7 @@ files: includes: - empty - build_cpp + - dev_cpp - cudatoolkit channels: @@ -46,6 +47,13 @@ dependencies: - scikit-build>=0.17 - pybind11-stubgen=0.10 - python=3.10 + + dev_cpp: + common: + - output_types: [conda] + packages: + - clangdev=14 + cudatoolkit: specific: - output_types: [conda] diff --git a/python/mrc/_pymrc/include/pymrc/operators.hpp b/python/mrc/_pymrc/include/pymrc/operators.hpp index 3a7b788d4..09ee61d31 100644 --- a/python/mrc/_pymrc/include/pymrc/operators.hpp +++ b/python/mrc/_pymrc/include/pymrc/operators.hpp @@ -60,6 +60,7 @@ class OperatorsProxy static PythonOperator build(PyFuncHolder build_fn); static PythonOperator filter(PyFuncHolder filter_fn); static PythonOperator flatten(); + static PythonOperator flatmap(OnDataFunction flatmap_fn); static PythonOperator map(OnDataFunction map_fn); static PythonOperator on_completed(PyFuncHolder()> finally_fn); static PythonOperator pairwise(); diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index cb0b679e1..ca48883bc 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -30,6 +30,7 @@ #include #include +#include #include #include #include @@ -169,6 +170,88 @@ PythonOperator OperatorsProxy::flatten() }}; } +PythonOperator OperatorsProxy::flatmap(OnDataFunction flatmap_fn) +{ + // Build and return the map operator + return {"flatten", [=](PyObjectObservable source) { + return rxcpp::observable<>::create([=](PyObjectSubscriber sink) { + source.subscribe( + sink, + [sink, flatmap_fn](PyHolder value) { + try + { + { + AcquireGIL gil; + + auto result = flatmap_fn(std::move(value)); + + auto inspect = pybind11::module_::import("inspect"); + + if (inspect.attr("iscoroutine")(result).cast()) + { + gil.release(); + throw std::runtime_error("flatmap does not yet support coroutines"); + } + + if (inspect.attr("isasyncgen")(result).cast()) + { + gil.release(); + throw std::runtime_error("flatmap does not yet support async generators"); + } + + // assume we got a list back and flatten it + + std::vector obj_list; + + { + auto l = py::list(std::move(result)); + + for (const auto& item : l) + { + // This increases the ref count by one but thats fine since the list will go + // out of scope and deref all its elements + obj_list.emplace_back(std::move(py::reinterpret_borrow(item))); + } + } + + if (sink.is_subscribed()) + { + // Release the GIL before calling on_next + gil.release(); + + // Loop over the list + for (auto& i : obj_list) + { + sink.on_next(std::move(i)); + } + } + } + } catch (py::error_already_set& err) + { + // Need the GIL here + AcquireGIL gil; + + py::print("Python error in callback hit!"); + py::print(err.what()); + + // Release before calling on_error + gil.release(); + + sink.on_error(std::current_exception()); + } + }, + [sink](std::exception_ptr ex) { + // Forward + sink.on_error(std::move(ex)); + }, + [sink]() { + // Forward + sink.on_completed(); + }); + }); + }}; +} + PythonOperator OperatorsProxy::map(OnDataFunction map_fn) { // Build and return the map operator diff --git a/python/mrc/core/operators.cpp b/python/mrc/core/operators.cpp index b74ff96ec..801764fc1 100644 --- a/python/mrc/core/operators.cpp +++ b/python/mrc/core/operators.cpp @@ -55,6 +55,7 @@ PYBIND11_MODULE(operators, py_mod) py_mod.def("build", &OperatorsProxy::build); py_mod.def("filter", &OperatorsProxy::filter); py_mod.def("flatten", &OperatorsProxy::flatten); + py_mod.def("flatmap", &OperatorsProxy::flatmap); py_mod.def("map", &OperatorsProxy::map); py_mod.def("on_completed", &OperatorsProxy::on_completed); py_mod.def("pairwise", &OperatorsProxy::pairwise); From f6e48d3f194beb43952b928533aff2739c0245cc Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Sun, 17 Sep 2023 23:17:56 +0000 Subject: [PATCH 02/23] update flatmap to detect coroutines and async generators --- python/mrc/_pymrc/include/pymrc/operators.hpp | 2 +- python/mrc/_pymrc/src/operators.cpp | 67 +++++++++---------- 2 files changed, 34 insertions(+), 35 deletions(-) diff --git a/python/mrc/_pymrc/include/pymrc/operators.hpp b/python/mrc/_pymrc/include/pymrc/operators.hpp index 09ee61d31..95854ac93 100644 --- a/python/mrc/_pymrc/include/pymrc/operators.hpp +++ b/python/mrc/_pymrc/include/pymrc/operators.hpp @@ -60,7 +60,7 @@ class OperatorsProxy static PythonOperator build(PyFuncHolder build_fn); static PythonOperator filter(PyFuncHolder filter_fn); static PythonOperator flatten(); - static PythonOperator flatmap(OnDataFunction flatmap_fn); + static PythonOperator flatmap(PyFuncHolder flatmap_fn); static PythonOperator map(OnDataFunction map_fn); static PythonOperator on_completed(PyFuncHolder()> finally_fn); static PythonOperator pairwise(); diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index ca48883bc..0765a18ee 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -170,7 +170,7 @@ PythonOperator OperatorsProxy::flatten() }}; } -PythonOperator OperatorsProxy::flatmap(OnDataFunction flatmap_fn) +PythonOperator OperatorsProxy::flatmap(PyFuncHolder flatmap_fn) { // Build and return the map operator return {"flatten", [=](PyObjectObservable source) { @@ -180,52 +180,51 @@ PythonOperator OperatorsProxy::flatmap(OnDataFunction flatmap_fn) [sink, flatmap_fn](PyHolder value) { try { - { - AcquireGIL gil; + AcquireGIL gil; - auto result = flatmap_fn(std::move(value)); + auto result = flatmap_fn(std::move(value)); - auto inspect = pybind11::module_::import("inspect"); + auto inspect = pybind11::module_::import("inspect"); - if (inspect.attr("iscoroutine")(result).cast()) - { - gil.release(); - throw std::runtime_error("flatmap does not yet support coroutines"); - } + if (inspect.attr("iscoroutine")(result).cast()) + { + gil.release(); + throw std::runtime_error("flatmap does not yet support coroutines"); + } - if (inspect.attr("isasyncgen")(result).cast()) - { - gil.release(); - throw std::runtime_error("flatmap does not yet support async generators"); - } + if (inspect.attr("isasyncgen")(result).cast()) + { + gil.release(); + throw std::runtime_error("flatmap does not yet support async generators"); + } - // assume we got a list back and flatten it + // assume we got a list back and flatten it - std::vector obj_list; + std::vector obj_list; + { + auto l = py::list(std::move(result)); + + for (const auto& item : l) { - auto l = py::list(std::move(result)); - - for (const auto& item : l) - { - // This increases the ref count by one but thats fine since the list will go - // out of scope and deref all its elements - obj_list.emplace_back(std::move(py::reinterpret_borrow(item))); - } + // This increases the ref count by one but thats fine since the list will go + // out of scope and deref all its elements + obj_list.emplace_back(std::move(py::reinterpret_borrow(item))); } + } - if (sink.is_subscribed()) + if (sink.is_subscribed()) + { + // Release the GIL before calling on_next + gil.release(); + + // Loop over the list + for (auto& i : obj_list) { - // Release the GIL before calling on_next - gil.release(); - - // Loop over the list - for (auto& i : obj_list) - { - sink.on_next(std::move(i)); - } + sink.on_next(std::move(i)); } } + } catch (py::error_already_set& err) { // Need the GIL here From 7d4cdee9371d62966beec3e86d480b37f7cd6cda Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 19 Sep 2023 22:45:34 +0000 Subject: [PATCH 03/23] add naive async generator support to flatmap --- python/mrc/_pymrc/src/operators.cpp | 77 +++++++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 5 deletions(-) diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index 0765a18ee..3ffda1263 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -23,6 +23,7 @@ #include #include +#include #include // IWYU pragma: keep #include #include @@ -31,6 +32,7 @@ #include #include +#include #include #include #include @@ -175,9 +177,46 @@ PythonOperator OperatorsProxy::flatmap(PyFuncHolder::create([=](PyObjectSubscriber sink) { + AcquireGIL gil; + + auto asyncio = pybind11::module_::import("asyncio"); + PyHolder loop; + + try + { + loop = asyncio.attr("get_event_loop")(); + } catch (std::runtime_error ex) + { + loop = asyncio.attr("new_event_loop")(); + } + + auto is_running = loop.attr("is_running")().cast(); + + std::thread loop_thread; + + std::atomic cancellation_token = false; + + if (not is_running) + { + loop_thread = std::thread([&cancellation_token, loop = loop]() { + while (not cancellation_token) + { + { + // run event loop once + pybind11::gil_scoped_acquire acquire; + loop.attr("stop")(); + loop.attr("run_forever")(); + } + std::this_thread::yield(); + } + }); + } + + gil.release(); + source.subscribe( sink, - [sink, flatmap_fn](PyHolder value) { + [sink, flatmap_fn, loop](PyHolder value) { try { AcquireGIL gil; @@ -194,11 +233,32 @@ PythonOperator OperatorsProxy::flatmap(PyFuncHolder()) { - gil.release(); - throw std::runtime_error("flatmap does not yet support async generators"); - } + auto asyncio = py::module_::import("asyncio"); - // assume we got a list back and flatten it + while (true) + { + auto task = result.attr("__anext__")(); + auto future = asyncio.attr("run_coroutine_threadsafe")(task, loop); + + while (not future.attr("done")().cast()) { + using namespace std::chrono_literals; + pybind11::gil_scoped_release release; + std::this_thread::yield(); + // std::this_thread::sleep_for(100ms); + } + + try{ + auto value = future.attr("result")(); + pybind11::gil_scoped_release release; + sink.on_next(std::move(py::reinterpret_borrow(value))); + } catch (std::exception ex){ + // probably an end async iteration exception. + break; + } + } + + return; + } std::vector obj_list; @@ -245,8 +305,15 @@ PythonOperator OperatorsProxy::flatmap(PyFuncHolder Date: Wed, 20 Sep 2023 03:17:30 +0000 Subject: [PATCH 04/23] async operator handler --- python/mrc/_pymrc/include/pymrc/operators.hpp | 18 +++ python/mrc/_pymrc/src/operators.cpp | 152 ++++++++++-------- 2 files changed, 103 insertions(+), 67 deletions(-) diff --git a/python/mrc/_pymrc/include/pymrc/operators.hpp b/python/mrc/_pymrc/include/pymrc/operators.hpp index 95854ac93..ca627adb4 100644 --- a/python/mrc/_pymrc/include/pymrc/operators.hpp +++ b/python/mrc/_pymrc/include/pymrc/operators.hpp @@ -54,6 +54,24 @@ class OperatorProxy static std::string get_name(PythonOperator& self); }; +class AsyncOperatorHandler +{ + public: + AsyncOperatorHandler(PyObjectSubscriber sink); + ~AsyncOperatorHandler(); + + void process_async_generator(PyObjectHolder asyncgen); + void join(); + + private: + PyObjectSubscriber m_sink; + pybind11::module_ m_asyncio; + PyHolder m_loop; + std::thread m_loop_thread; + std::atomic m_loop_ct = false; + std::atomic m_outstanding = 0; +}; + class OperatorsProxy { public: diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index 3ffda1263..d00eddd30 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -172,51 +172,97 @@ PythonOperator OperatorsProxy::flatten() }}; } -PythonOperator OperatorsProxy::flatmap(PyFuncHolder flatmap_fn) +AsyncOperatorHandler::AsyncOperatorHandler(PyObjectSubscriber sink) : m_sink(std::move(sink)) { - // Build and return the map operator - return {"flatten", [=](PyObjectObservable source) { - return rxcpp::observable<>::create([=](PyObjectSubscriber sink) { - AcquireGIL gil; + pybind11::gil_scoped_acquire acquire; - auto asyncio = pybind11::module_::import("asyncio"); - PyHolder loop; + m_asyncio = pybind11::module_::import("asyncio"); - try - { - loop = asyncio.attr("get_event_loop")(); - } catch (std::runtime_error ex) - { - loop = asyncio.attr("new_event_loop")(); - } + try + { + m_loop = m_asyncio.attr("get_event_loop")(); + } catch (std::runtime_error ex) + { + m_loop = m_asyncio.attr("new_event_loop")(); + } - auto is_running = loop.attr("is_running")().cast(); + auto is_running = m_loop.attr("is_running")().cast(); - std::thread loop_thread; + if (is_running) + { + return; + } - std::atomic cancellation_token = false; + if (not is_running) + { + m_loop_thread = std::thread([&loop_ct = m_loop_ct, loop = m_loop]() { + while (not loop_ct) + { + { + // run event loop once + pybind11::gil_scoped_acquire acquire; + loop.attr("stop")(); + loop.attr("run_forever")(); + } + std::this_thread::yield(); + } + }); + } +} - if (not is_running) - { - loop_thread = std::thread([&cancellation_token, loop = loop]() { - while (not cancellation_token) - { - { - // run event loop once - pybind11::gil_scoped_acquire acquire; - loop.attr("stop")(); - loop.attr("run_forever")(); - } - std::this_thread::yield(); - } - }); - } +AsyncOperatorHandler::~AsyncOperatorHandler() +{ + if (m_loop_thread.joinable()) + { + m_loop_ct = true; + m_loop_thread.join(); + } +} - gil.release(); +void AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen) +{ + auto task = asyncgen.attr("__anext__")(); + auto future = m_asyncio.attr("run_coroutine_threadsafe")(task, m_loop); + + m_outstanding++; + + future.attr("add_done_callback")(py::cpp_function([this, asyncgen](py::object future) { + pybind11::gil_scoped_acquire acquire; + try + { + auto value = future.attr("result")(); + { + pybind11::gil_scoped_release release; + m_sink.on_next(std::move(py::reinterpret_borrow(value))); + } + this->process_async_generator(asyncgen); + m_outstanding--; + } catch (std::exception ex) + { + m_outstanding--; + // probably an end async iteration exception. + return; + } + })); +} + +void AsyncOperatorHandler::join() +{ + while (m_outstanding > 0) + { + std::this_thread::yield(); + } +} +PythonOperator OperatorsProxy::flatmap(PyFuncHolder flatmap_fn) +{ + // Build and return the map operator + return {"flatten", [=](PyObjectObservable source) { + return rxcpp::observable<>::create([=](PyObjectSubscriber sink) { + auto async_handler = std::make_unique(sink); source.subscribe( sink, - [sink, flatmap_fn, loop](PyHolder value) { + [sink, flatmap_fn, &async_handler = *async_handler](PyHolder value) { try { AcquireGIL gil; @@ -233,30 +279,7 @@ PythonOperator OperatorsProxy::flatmap(PyFuncHolder()) { - auto asyncio = py::module_::import("asyncio"); - - while (true) - { - auto task = result.attr("__anext__")(); - auto future = asyncio.attr("run_coroutine_threadsafe")(task, loop); - - while (not future.attr("done")().cast()) { - using namespace std::chrono_literals; - pybind11::gil_scoped_release release; - std::this_thread::yield(); - // std::this_thread::sleep_for(100ms); - } - - try{ - auto value = future.attr("result")(); - pybind11::gil_scoped_release release; - sink.on_next(std::move(py::reinterpret_borrow(value))); - } catch (std::exception ex){ - // probably an end async iteration exception. - break; - } - } - + async_handler.process_async_generator(result); return; } @@ -299,21 +322,16 @@ PythonOperator OperatorsProxy::flatmap(PyFuncHolder Date: Wed, 20 Sep 2023 03:25:57 +0000 Subject: [PATCH 05/23] rename flatmap -> flatmap_async --- python/mrc/_pymrc/include/pymrc/operators.hpp | 2 +- python/mrc/_pymrc/src/operators.cpp | 56 +++---------------- python/mrc/core/operators.cpp | 2 +- 3 files changed, 10 insertions(+), 50 deletions(-) diff --git a/python/mrc/_pymrc/include/pymrc/operators.hpp b/python/mrc/_pymrc/include/pymrc/operators.hpp index ca627adb4..428b3a5e3 100644 --- a/python/mrc/_pymrc/include/pymrc/operators.hpp +++ b/python/mrc/_pymrc/include/pymrc/operators.hpp @@ -78,7 +78,7 @@ class OperatorsProxy static PythonOperator build(PyFuncHolder build_fn); static PythonOperator filter(PyFuncHolder filter_fn); static PythonOperator flatten(); - static PythonOperator flatmap(PyFuncHolder flatmap_fn); + static PythonOperator flatmap_async(PyFuncHolder flatmap_fn); static PythonOperator map(OnDataFunction map_fn); static PythonOperator on_completed(PyFuncHolder()> finally_fn); static PythonOperator pairwise(); diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index d00eddd30..c9982d5de 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -254,7 +254,7 @@ void AsyncOperatorHandler::join() } } -PythonOperator OperatorsProxy::flatmap(PyFuncHolder flatmap_fn) +PythonOperator OperatorsProxy::flatmap_async(PyFuncHolder flatmap_fn) { // Build and return the map operator return {"flatten", [=](PyObjectObservable source) { @@ -265,59 +265,19 @@ PythonOperator OperatorsProxy::flatmap(PyFuncHolder()) - { - gil.release(); - throw std::runtime_error("flatmap does not yet support coroutines"); - } - - if (inspect.attr("isasyncgen")(result).cast()) - { - async_handler.process_async_generator(result); - return; - } - - std::vector obj_list; - - { - auto l = py::list(std::move(result)); - - for (const auto& item : l) - { - // This increases the ref count by one but thats fine since the list will go - // out of scope and deref all its elements - obj_list.emplace_back(std::move(py::reinterpret_borrow(item))); - } - } - - if (sink.is_subscribed()) - { - // Release the GIL before calling on_next - gil.release(); - - // Loop over the list - for (auto& i : obj_list) - { - sink.on_next(std::move(i)); - } - } + async_handler.process_async_generator(result); } catch (py::error_already_set& err) { - // Need the GIL here - AcquireGIL gil; - - py::print("Python error in callback hit!"); - py::print(err.what()); - - // Release before calling on_error - gil.release(); + { + pybind11::gil_scoped_acquire acquire; + py::print("Python error in callback hit!"); + py::print(err.what()); + } sink.on_error(std::current_exception()); } diff --git a/python/mrc/core/operators.cpp b/python/mrc/core/operators.cpp index 801764fc1..0b2efbaa9 100644 --- a/python/mrc/core/operators.cpp +++ b/python/mrc/core/operators.cpp @@ -55,7 +55,7 @@ PYBIND11_MODULE(operators, py_mod) py_mod.def("build", &OperatorsProxy::build); py_mod.def("filter", &OperatorsProxy::filter); py_mod.def("flatten", &OperatorsProxy::flatten); - py_mod.def("flatmap", &OperatorsProxy::flatmap); + py_mod.def("flatmap_async", &OperatorsProxy::flatmap_async); py_mod.def("map", &OperatorsProxy::map); py_mod.def("on_completed", &OperatorsProxy::on_completed); py_mod.def("pairwise", &OperatorsProxy::pairwise); From 1559d7c87e3241645854e711d628c7b34f57d583 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 20 Sep 2023 03:27:42 +0000 Subject: [PATCH 06/23] remove erroneous eval include --- python/mrc/_pymrc/src/operators.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index c9982d5de..34a5d2c69 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -23,7 +23,6 @@ #include #include -#include #include // IWYU pragma: keep #include #include From 9d738cf1b1f69e208c29928c63795dd7bc5c5177 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 20 Sep 2023 16:10:49 +0000 Subject: [PATCH 07/23] better error handling --- python/mrc/_pymrc/include/pymrc/operators.hpp | 5 +- python/mrc/_pymrc/src/operators.cpp | 66 ++++++++++++------- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/python/mrc/_pymrc/include/pymrc/operators.hpp b/python/mrc/_pymrc/include/pymrc/operators.hpp index 428b3a5e3..dfa644522 100644 --- a/python/mrc/_pymrc/include/pymrc/operators.hpp +++ b/python/mrc/_pymrc/include/pymrc/operators.hpp @@ -61,14 +61,15 @@ class AsyncOperatorHandler ~AsyncOperatorHandler(); void process_async_generator(PyObjectHolder asyncgen); - void join(); + void wait_completed(); + void wait_cancel(); private: PyObjectSubscriber m_sink; pybind11::module_ m_asyncio; PyHolder m_loop; std::thread m_loop_thread; - std::atomic m_loop_ct = false; + std::atomic m_loop_ct = false; std::atomic m_outstanding = 0; }; diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index 34a5d2c69..690335e6d 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -192,21 +193,19 @@ AsyncOperatorHandler::AsyncOperatorHandler(PyObjectSubscriber sink) : m_sink(std return; } - if (not is_running) - { - m_loop_thread = std::thread([&loop_ct = m_loop_ct, loop = m_loop]() { - while (not loop_ct) + m_loop_thread = std::thread([&loop_ct = m_loop_ct, loop = m_loop]() { + while (not loop_ct) + { { - { - // run event loop once - pybind11::gil_scoped_acquire acquire; - loop.attr("stop")(); - loop.attr("run_forever")(); - } - std::this_thread::yield(); + // run event loop once + pybind11::gil_scoped_acquire acquire; + loop.attr("stop")(); + loop.attr("run_forever")(); } - }); - } + + std::this_thread::yield(); + } + }); } AsyncOperatorHandler::~AsyncOperatorHandler() @@ -229,23 +228,42 @@ void AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen) pybind11::gil_scoped_acquire acquire; try { - auto value = future.attr("result")(); + if (m_sink.is_subscribed()) { - pybind11::gil_scoped_release release; - m_sink.on_next(std::move(py::reinterpret_borrow(value))); + auto value = future.attr("result")(); + { + pybind11::gil_scoped_release release; + m_sink.on_next(std::move(py::reinterpret_borrow(value))); + } } + + // should we continue to process the generator even if we're not subscribed? this->process_async_generator(asyncgen); - m_outstanding--; + } catch (py::error_already_set ex) + { + if (not ex.matches(PyExc_StopAsyncIteration)) + { + throw; // does this get caught by the next catch? + } } catch (std::exception ex) { - m_outstanding--; - // probably an end async iteration exception. - return; + pybind11::gil_scoped_release release; + if (m_sink.is_subscribed()) + { + m_sink.on_error(std::current_exception()); + } } + + m_outstanding--; })); } -void AsyncOperatorHandler::join() +void AsyncOperatorHandler::wait_cancel() +{ + // TOOD: determine if/how to cancel in-flight and future async generators +} + +void AsyncOperatorHandler::wait_completed() { while (m_outstanding > 0) { @@ -283,12 +301,12 @@ PythonOperator OperatorsProxy::flatmap_async(PyFuncHolder Date: Wed, 20 Sep 2023 20:37:52 +0000 Subject: [PATCH 08/23] broken error handling --- python/mrc/_pymrc/src/operators.cpp | 37 +++++++++++++++-------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index 690335e6d..e8aea97f9 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -219,38 +219,41 @@ AsyncOperatorHandler::~AsyncOperatorHandler() void AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen) { + pybind11::gil_scoped_acquire acquire; auto task = asyncgen.attr("__anext__")(); auto future = m_asyncio.attr("run_coroutine_threadsafe")(task, m_loop); m_outstanding++; future.attr("add_done_callback")(py::cpp_function([this, asyncgen](py::object future) { - pybind11::gil_scoped_acquire acquire; try { if (m_sink.is_subscribed()) { - auto value = future.attr("result")(); - { - pybind11::gil_scoped_release release; - m_sink.on_next(std::move(py::reinterpret_borrow(value))); - } + auto acquire = std::make_unique(); + auto value = future.attr("result")(); + acquire.reset(); + m_sink.on_next(std::move(py::reinterpret_borrow(value))); } // should we continue to process the generator even if we're not subscribed? this->process_async_generator(asyncgen); - } catch (py::error_already_set ex) + } catch (py::error_already_set& ex) { if (not ex.matches(PyExc_StopAsyncIteration)) { - throw; // does this get caught by the next catch? + { + pybind11::gil_scoped_acquire acquire; + py::print("got python exception!", ex.what()); + } + if (m_sink.is_subscribed()) + { + m_sink.on_error(std::current_exception()); + } } - } catch (std::exception ex) - { - pybind11::gil_scoped_release release; - if (m_sink.is_subscribed()) { - m_sink.on_error(std::current_exception()); + pybind11::gil_scoped_acquire acquire; + py::print("error_already_set post"); } } @@ -282,12 +285,10 @@ PythonOperator OperatorsProxy::flatmap_async(PyFuncHolder(); + auto result = flatmap_fn(std::move(value)); + acquire.reset(); async_handler.process_async_generator(result); - } catch (py::error_already_set& err) { { From f4f9da6329cc6fa69957c5dcbd5be72b15e226cb Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 22 Sep 2023 17:32:14 +0000 Subject: [PATCH 09/23] concat_map_async --- python/mrc/_pymrc/include/pymrc/operators.hpp | 15 +- python/mrc/_pymrc/src/operators.cpp | 146 ++++++++---------- python/mrc/core/operators.cpp | 2 +- 3 files changed, 72 insertions(+), 91 deletions(-) diff --git a/python/mrc/_pymrc/include/pymrc/operators.hpp b/python/mrc/_pymrc/include/pymrc/operators.hpp index dfa644522..2cf2424d6 100644 --- a/python/mrc/_pymrc/include/pymrc/operators.hpp +++ b/python/mrc/_pymrc/include/pymrc/operators.hpp @@ -19,6 +19,9 @@ #include "pymrc/types.hpp" +#include + +#include #include #include @@ -57,20 +60,16 @@ class OperatorProxy class AsyncOperatorHandler { public: - AsyncOperatorHandler(PyObjectSubscriber sink); + AsyncOperatorHandler(); ~AsyncOperatorHandler(); - void process_async_generator(PyObjectHolder asyncgen); - void wait_completed(); - void wait_cancel(); + boost::fibers::future process_async_generator(PyObjectHolder asyncgen); private: - PyObjectSubscriber m_sink; pybind11::module_ m_asyncio; PyHolder m_loop; std::thread m_loop_thread; - std::atomic m_loop_ct = false; - std::atomic m_outstanding = 0; + std::atomic m_loop_ct = false; }; class OperatorsProxy @@ -79,7 +78,7 @@ class OperatorsProxy static PythonOperator build(PyFuncHolder build_fn); static PythonOperator filter(PyFuncHolder filter_fn); static PythonOperator flatten(); - static PythonOperator flatmap_async(PyFuncHolder flatmap_fn); + static PythonOperator concat_map_async(PyFuncHolder flatmap_fn); static PythonOperator map(OnDataFunction map_fn); static PythonOperator on_completed(PyFuncHolder()> finally_fn); static PythonOperator pairwise(); diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index e8aea97f9..f934c0d49 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -21,6 +21,11 @@ #include "pymrc/utilities/acquire_gil.hpp" #include "pymrc/utilities/function_wrappers.hpp" +#include +#include +#include +#include +#include #include #include #include // IWYU pragma: keep @@ -31,6 +36,8 @@ #include #include +#include +#include #include #include #include @@ -172,9 +179,9 @@ PythonOperator OperatorsProxy::flatten() }}; } -AsyncOperatorHandler::AsyncOperatorHandler(PyObjectSubscriber sink) : m_sink(std::move(sink)) +AsyncOperatorHandler::AsyncOperatorHandler() { - pybind11::gil_scoped_acquire acquire; + py::gil_scoped_acquire acquire; m_asyncio = pybind11::module_::import("asyncio"); @@ -183,29 +190,21 @@ AsyncOperatorHandler::AsyncOperatorHandler(PyObjectSubscriber sink) : m_sink(std m_loop = m_asyncio.attr("get_event_loop")(); } catch (std::runtime_error ex) { - m_loop = m_asyncio.attr("new_event_loop")(); - } - - auto is_running = m_loop.attr("is_running")().cast(); - - if (is_running) - { - return; - } - - m_loop_thread = std::thread([&loop_ct = m_loop_ct, loop = m_loop]() { - while (not loop_ct) - { + m_loop = m_asyncio.attr("new_event_loop")(); + m_loop_thread = std::thread([&loop_ct = m_loop_ct, loop = m_loop]() { + while (not loop_ct) { - // run event loop once - pybind11::gil_scoped_acquire acquire; - loop.attr("stop")(); - loop.attr("run_forever")(); - } + { + // run event loop once + py::gil_scoped_acquire acquire; + loop.attr("stop")(); + loop.attr("run_forever")(); + } - std::this_thread::yield(); - } - }); + std::this_thread::yield(); + } + }); + } } AsyncOperatorHandler::~AsyncOperatorHandler() @@ -217,97 +216,80 @@ AsyncOperatorHandler::~AsyncOperatorHandler() } } -void AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen) +boost::fibers::future AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen) { - pybind11::gil_scoped_acquire acquire; + py::gil_scoped_acquire acquire; + auto task = asyncgen.attr("__anext__")(); auto future = m_asyncio.attr("run_coroutine_threadsafe")(task, m_loop); + auto result = std::make_unique>(); - m_outstanding++; + auto result_future = result->get_future(); - future.attr("add_done_callback")(py::cpp_function([this, asyncgen](py::object future) { + future.attr("add_done_callback")(py::cpp_function([result = std::move(result)](py::object future) { try { - if (m_sink.is_subscribed()) - { - auto acquire = std::make_unique(); - auto value = future.attr("result")(); - acquire.reset(); - m_sink.on_next(std::move(py::reinterpret_borrow(value))); - } - - // should we continue to process the generator even if we're not subscribed? - this->process_async_generator(asyncgen); - } catch (py::error_already_set& ex) + auto acquire = std::make_unique(); + auto value = future.attr("result")(); + acquire.reset(); + result->set_value(std::move(py::reinterpret_borrow(value))); + } catch (std::exception& ex) { - if (not ex.matches(PyExc_StopAsyncIteration)) - { - { - pybind11::gil_scoped_acquire acquire; - py::print("got python exception!", ex.what()); - } - if (m_sink.is_subscribed()) - { - m_sink.on_error(std::current_exception()); - } - } - { - pybind11::gil_scoped_acquire acquire; - py::print("error_already_set post"); - } + result->set_exception(std::current_exception()); } - - m_outstanding--; })); -} - -void AsyncOperatorHandler::wait_cancel() -{ - // TOOD: determine if/how to cancel in-flight and future async generators -} -void AsyncOperatorHandler::wait_completed() -{ - while (m_outstanding > 0) - { - std::this_thread::yield(); - } + return result_future; } -PythonOperator OperatorsProxy::flatmap_async(PyFuncHolder flatmap_fn) +PythonOperator OperatorsProxy::concat_map_async(PyFuncHolder flatmap_fn) { // Build and return the map operator return {"flatten", [=](PyObjectObservable source) { return rxcpp::observable<>::create([=](PyObjectSubscriber sink) { - auto async_handler = std::make_unique(sink); + auto async_handler = std::make_unique(); source.subscribe( sink, [sink, flatmap_fn, &async_handler = *async_handler](PyHolder value) { - try - { - auto acquire = std::make_unique(); - auto result = flatmap_fn(std::move(value)); - acquire.reset(); - async_handler.process_async_generator(result); - } catch (py::error_already_set& err) + auto acquire = std::make_unique(); + auto asyncgen = flatmap_fn(std::move(value)); + acquire.reset(); + while (sink.is_subscribed()) { + using namespace std::chrono_literals; + + auto gen_yielded = async_handler.process_async_generator(asyncgen); + + while (gen_yielded.wait_for(0s) != boost::fibers::future_status::ready) { - pybind11::gil_scoped_acquire acquire; - py::print("Python error in callback hit!"); - py::print(err.what()); + boost::this_fiber::yield(); + + if (not sink.is_subscribed()) + { + return; + } } - sink.on_error(std::current_exception()); + try + { + sink.on_next(gen_yielded.get()); + } catch (pybind11::error_already_set& ex) + { + if (ex.matches(PyExc_StopAsyncIteration)) + { + return; + } + + sink.on_error(std::current_exception()); + } } }, [sink, &async_handler = *async_handler](std::exception_ptr ex) { // Forward - async_handler.wait_cancel(); sink.on_error(std::current_exception()); }, [sink, &async_handler = *async_handler]() { // Forward - async_handler.wait_completed(); sink.on_completed(); }); }); diff --git a/python/mrc/core/operators.cpp b/python/mrc/core/operators.cpp index 0b2efbaa9..7a3d3de89 100644 --- a/python/mrc/core/operators.cpp +++ b/python/mrc/core/operators.cpp @@ -55,7 +55,7 @@ PYBIND11_MODULE(operators, py_mod) py_mod.def("build", &OperatorsProxy::build); py_mod.def("filter", &OperatorsProxy::filter); py_mod.def("flatten", &OperatorsProxy::flatten); - py_mod.def("flatmap_async", &OperatorsProxy::flatmap_async); + py_mod.def("concat_map_async", &OperatorsProxy::concat_map_async); py_mod.def("map", &OperatorsProxy::map); py_mod.def("on_completed", &OperatorsProxy::on_completed); py_mod.def("pairwise", &OperatorsProxy::pairwise); From 4ea72bfd94f17b7b46a11036d87e0fdb88a5cb7f Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 22 Sep 2023 18:23:19 +0000 Subject: [PATCH 10/23] setup debugging for concat_map_async --- python/mrc/_pymrc/include/pymrc/executor.hpp | 3 ++ python/mrc/_pymrc/src/operators.cpp | 29 ++++++++++++-------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/python/mrc/_pymrc/include/pymrc/executor.hpp b/python/mrc/_pymrc/include/pymrc/executor.hpp index c339d925a..cec4c43ca 100644 --- a/python/mrc/_pymrc/include/pymrc/executor.hpp +++ b/python/mrc/_pymrc/include/pymrc/executor.hpp @@ -32,6 +32,9 @@ class IExecutor; } namespace mrc::pymrc { + +std::function create_gil_initializer(); + class Pipeline; // Export everything in the mrc::pymrc namespace by default since we compile with -fvisibility=hidden diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index f934c0d49..b4ea0b6cf 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -17,6 +17,7 @@ #include "pymrc/operators.hpp" +#include "pymrc/executor.hpp" #include "pymrc/types.hpp" #include "pymrc/utilities/acquire_gil.hpp" #include "pymrc/utilities/function_wrappers.hpp" @@ -190,20 +191,26 @@ AsyncOperatorHandler::AsyncOperatorHandler() m_loop = m_asyncio.attr("get_event_loop")(); } catch (std::runtime_error ex) { + auto setup_debugging = create_gil_initializer(); + m_loop = m_asyncio.attr("new_event_loop")(); - m_loop_thread = std::thread([&loop_ct = m_loop_ct, loop = m_loop]() { - while (not loop_ct) - { + m_loop_thread = std::thread( + [&loop_ct = m_loop_ct, loop = m_loop](std::function setup_debugging) { + setup_debugging(); + + while (not loop_ct) { - // run event loop once - py::gil_scoped_acquire acquire; - loop.attr("stop")(); - loop.attr("run_forever")(); - } + { + // run event loop once + py::gil_scoped_acquire acquire; + loop.attr("stop")(); + loop.attr("run_forever")(); + } - std::this_thread::yield(); - } - }); + std::this_thread::yield(); + } + }, + std::move(setup_debugging)); } } From b717bd5346df85c93e149d96fe3b0aad810509f0 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 22 Sep 2023 18:26:47 +0000 Subject: [PATCH 11/23] use proper name for concat_map_async --- python/mrc/_pymrc/src/operators.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index b4ea0b6cf..01c0bef03 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -252,7 +252,7 @@ boost::fibers::future AsyncOperatorHandler::process_async_genera PythonOperator OperatorsProxy::concat_map_async(PyFuncHolder flatmap_fn) { // Build and return the map operator - return {"flatten", [=](PyObjectObservable source) { + return {"concat_map_async", [=](PyObjectObservable source) { return rxcpp::observable<>::create([=](PyObjectSubscriber sink) { auto async_handler = std::make_unique(); source.subscribe( From 3b0f778ae049ab780462ebc5034776a9b4f91241 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 25 Sep 2023 00:23:23 +0000 Subject: [PATCH 12/23] add PythonNodeContext --- .../mrc/runnable/detail/type_traits.hpp | 12 +++--------- cpp/mrc/include/mrc/segment/builder.hpp | 9 +++++++++ dependencies.yaml | 2 +- python/mrc/_pymrc/include/pymrc/node.hpp | 14 +++++++++++--- python/mrc/_pymrc/src/node.cpp | 18 +++++++++++++++++- python/mrc/_pymrc/src/segment.cpp | 6 ++++-- 6 files changed, 45 insertions(+), 16 deletions(-) diff --git a/cpp/mrc/include/mrc/runnable/detail/type_traits.hpp b/cpp/mrc/include/mrc/runnable/detail/type_traits.hpp index d33d7be84..0e6dab75a 100644 --- a/cpp/mrc/include/mrc/runnable/detail/type_traits.hpp +++ b/cpp/mrc/include/mrc/runnable/detail/type_traits.hpp @@ -85,16 +85,10 @@ static auto unwrap_context(l4_concept c, ThreadContext& t) return std::make_pair(self{}, self{}); } -static auto unwrap_context(l2_concept c, Context& t) +template +static auto unwrap_context(l2_concept c, T& t) { - return std::make_pair(self{}, self{}); -} - -template -static error unwrap_context(error e, T& t) -{ - static_assert(invalid_concept::error, "object is not a Context"); - return {}; + return std::make_pair(self{}, self{}); } template diff --git a/cpp/mrc/include/mrc/segment/builder.hpp b/cpp/mrc/include/mrc/segment/builder.hpp index a35f571c9..d03ce8b04 100644 --- a/cpp/mrc/include/mrc/segment/builder.hpp +++ b/cpp/mrc/include/mrc/segment/builder.hpp @@ -278,6 +278,9 @@ class IBuilder typename... ArgsT> auto make_node(std::string name, ArgsT&&... ops); + template + auto make_node_explicit(std::string name, ArgsT&&... ops); + /** * Creates and returns an instance of a node component with the specified type, name and arguments. * @tparam SinkTypeT The sink type of the node component to be created. @@ -436,6 +439,12 @@ auto IBuilder::make_node(std::string name, ArgsT&&... ops) return construct_object>(name, std::forward(ops)...); } +template +auto IBuilder::make_node_explicit(std::string name, ArgsT&&... ops) +{ + return construct_object(name, std::forward(ops)...); +} + template class NodeTypeT, typename... ArgsT> auto IBuilder::make_node_component(std::string name, ArgsT&&... ops) { diff --git a/dependencies.yaml b/dependencies.yaml index 85976ee05..0ebfd8495 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -52,7 +52,7 @@ dependencies: common: - output_types: [conda] packages: - - clangdev=14 + - clangdev=16 cudatoolkit: specific: diff --git a/python/mrc/_pymrc/include/pymrc/node.hpp b/python/mrc/_pymrc/include/pymrc/node.hpp index f5d72e7c3..3271e4179 100644 --- a/python/mrc/_pymrc/include/pymrc/node.hpp +++ b/python/mrc/_pymrc/include/pymrc/node.hpp @@ -39,6 +39,7 @@ #include #include +#include #include #include #include @@ -310,14 +311,21 @@ class PythonSinkComponent : public node::RxSinkComponent, using base_t::base_t; }; -template -class PythonNode : public node::RxNode, +class PythonNodeContext : public mrc::runnable::Context +{ + public: + PythonNodeContext(std::size_t rank, std::size_t size); + ~PythonNodeContext() override; +}; + +template +class PythonNode : public node::RxNode, public pymrc::AutoRegSourceAdapter, public pymrc::AutoRegSinkAdapter, public pymrc::AutoRegIngressPort, public pymrc::AutoRegEgressPort { - using base_t = node::RxNode; + using base_t = node::RxNode; public: using typename base_t::stream_fn_t; diff --git a/python/mrc/_pymrc/src/node.cpp b/python/mrc/_pymrc/src/node.cpp index c3a1563e5..52aa32664 100644 --- a/python/mrc/_pymrc/src/node.cpp +++ b/python/mrc/_pymrc/src/node.cpp @@ -17,4 +17,20 @@ #include "pymrc/node.hpp" -namespace mrc::pymrc {} // namespace mrc::pymrc +#include + +namespace mrc::pymrc { + +PythonNodeContext::PythonNodeContext(std::size_t rank, std::size_t size) : mrc::runnable::Context(rank, size) +{ + pybind11::gil_scoped_acquire acquire; + pybind11::print("PythonNodeContext::ctor"); +} + +PythonNodeContext::~PythonNodeContext() +{ + pybind11::gil_scoped_acquire acquire; + pybind11::print("PythonNodeContext::dtor"); +} + +} // namespace mrc::pymrc diff --git a/python/mrc/_pymrc/src/segment.cpp b/python/mrc/_pymrc/src/segment.cpp index 4e60e63e4..5957bbe72 100644 --- a/python/mrc/_pymrc/src/segment.cpp +++ b/python/mrc/_pymrc/src/segment.cpp @@ -390,7 +390,8 @@ std::shared_ptr BuilderProxy::make_node(mrc::seg const std::string& name, pybind11::args operators) { - auto node = self.make_node(name); + // auto node = self.make_node(name); + auto node = self.make_node_explicit>(name); node->object().make_stream( [operators = PyObjectHolder(std::move(operators))](const PyObjectObservable& input) -> PyObjectObservable { @@ -412,7 +413,8 @@ std::shared_ptr BuilderProxy::make_node_full( "make_node_full(name, sub_fn) is deprecated and will be removed in a future version. Use " "make_node(name, mrc.core.operators.build(sub_fn)) instead."); - auto node = self.make_node(name); + // auto node = self.make_node(name); + auto node = self.make_node_explicit>(name); node->object().make_stream([sub_fn](const PyObjectObservable& input) -> PyObjectObservable { return rxcpp::observable<>::create([input, sub_fn](pymrc::PyObjectSubscriber output) { From be6b4221561b2efa3d0b415dc3081120d50ff6a3 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 25 Sep 2023 02:14:44 +0000 Subject: [PATCH 13/23] PythonNodeLoopHandle --- python/mrc/_pymrc/include/pymrc/node.hpp | 25 ++++++ python/mrc/_pymrc/include/pymrc/operators.hpp | 5 +- python/mrc/_pymrc/src/node.cpp | 76 ++++++++++++++++++- python/mrc/_pymrc/src/operators.cpp | 47 ++---------- 4 files changed, 105 insertions(+), 48 deletions(-) diff --git a/python/mrc/_pymrc/include/pymrc/node.hpp b/python/mrc/_pymrc/include/pymrc/node.hpp index 3271e4179..f785431e1 100644 --- a/python/mrc/_pymrc/include/pymrc/node.hpp +++ b/python/mrc/_pymrc/include/pymrc/node.hpp @@ -39,6 +39,7 @@ #include #include +#include #include #include #include @@ -311,11 +312,35 @@ class PythonSinkComponent : public node::RxSinkComponent, using base_t::base_t; }; +class PythonNodeLoopHandle +{ + public: + PythonNodeLoopHandle(); + ~PythonNodeLoopHandle(); + + uint32_t inc_ref(); + uint32_t dec_ref(); + + PyHolder get_asyncio_event_loop(); + + private: + uint32_t m_references = 0; + PyHolder m_loop; + std::atomic m_loop_ct = false; + std::thread m_loop_thread; +}; + class PythonNodeContext : public mrc::runnable::Context { public: PythonNodeContext(std::size_t rank, std::size_t size); ~PythonNodeContext() override; + + PyHolder get_asyncio_event_loop(); + + private: + // this was intended to be a thread-specific-pointer, but I couldn't get boost::thread to link. + std::unique_ptr m_loop_handle; }; template diff --git a/python/mrc/_pymrc/include/pymrc/operators.hpp b/python/mrc/_pymrc/include/pymrc/operators.hpp index 2cf2424d6..62907e713 100644 --- a/python/mrc/_pymrc/include/pymrc/operators.hpp +++ b/python/mrc/_pymrc/include/pymrc/operators.hpp @@ -61,15 +61,12 @@ class AsyncOperatorHandler { public: AsyncOperatorHandler(); - ~AsyncOperatorHandler(); + ~AsyncOperatorHandler() = default; boost::fibers::future process_async_generator(PyObjectHolder asyncgen); private: pybind11::module_ m_asyncio; - PyHolder m_loop; - std::thread m_loop_thread; - std::atomic m_loop_ct = false; }; class OperatorsProxy diff --git a/python/mrc/_pymrc/src/node.cpp b/python/mrc/_pymrc/src/node.cpp index 52aa32664..e66a71cad 100644 --- a/python/mrc/_pymrc/src/node.cpp +++ b/python/mrc/_pymrc/src/node.cpp @@ -17,20 +17,88 @@ #include "pymrc/node.hpp" +#include "pymrc/executor.hpp" + #include +#include +#include +#include + namespace mrc::pymrc { -PythonNodeContext::PythonNodeContext(std::size_t rank, std::size_t size) : mrc::runnable::Context(rank, size) +PythonNodeLoopHandle::PythonNodeLoopHandle() { pybind11::gil_scoped_acquire acquire; - pybind11::print("PythonNodeContext::ctor"); + + auto asyncio = pybind11::module_::import("asyncio"); + + auto setup_debugging = create_gil_initializer(); + + m_loop = asyncio.attr("new_event_loop")(); + m_loop_ct = false; + m_loop_thread = std::thread([loop = m_loop, &ct = m_loop_ct, setup_debugging = std::move(setup_debugging)]() { + setup_debugging(); + + while (not ct) + { + { + // run event loop once + pybind11::gil_scoped_acquire acquire; + loop.attr("stop")(); + loop.attr("run_forever")(); + } + + std::this_thread::yield(); + } + }); +} + +PythonNodeLoopHandle::~PythonNodeLoopHandle() +{ + if (m_loop_thread.joinable()) + { + m_loop_ct = true; + m_loop_thread.join(); + } +} + +uint32_t PythonNodeLoopHandle::inc_ref() +{ + return ++m_references; +} + +uint32_t PythonNodeLoopHandle::dec_ref() +{ + return --m_references; +} + +PyHolder PythonNodeLoopHandle::get_asyncio_event_loop() +{ + return m_loop; +} + +PythonNodeContext::PythonNodeContext(std::size_t rank, std::size_t size) : mrc::runnable::Context(rank, size) +{ + if (m_loop_handle == nullptr) + { + m_loop_handle = std::make_unique(); + } + + m_loop_handle->inc_ref(); } PythonNodeContext::~PythonNodeContext() { - pybind11::gil_scoped_acquire acquire; - pybind11::print("PythonNodeContext::dtor"); + if (m_loop_handle != nullptr and m_loop_handle->dec_ref() == 0) + { + m_loop_handle.reset(); + } +} + +PyHolder PythonNodeContext::get_asyncio_event_loop() +{ + return m_loop_handle->get_asyncio_event_loop(); } } // namespace mrc::pymrc diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index 01c0bef03..93a6955f8 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -18,9 +18,11 @@ #include "pymrc/operators.hpp" #include "pymrc/executor.hpp" +#include "pymrc/node.hpp" #include "pymrc/types.hpp" #include "pymrc/utilities/acquire_gil.hpp" #include "pymrc/utilities/function_wrappers.hpp" +#include "mrc/runnable/context.hpp" #include #include @@ -182,53 +184,18 @@ PythonOperator OperatorsProxy::flatten() AsyncOperatorHandler::AsyncOperatorHandler() { - py::gil_scoped_acquire acquire; - - m_asyncio = pybind11::module_::import("asyncio"); - - try - { - m_loop = m_asyncio.attr("get_event_loop")(); - } catch (std::runtime_error ex) - { - auto setup_debugging = create_gil_initializer(); - - m_loop = m_asyncio.attr("new_event_loop")(); - m_loop_thread = std::thread( - [&loop_ct = m_loop_ct, loop = m_loop](std::function setup_debugging) { - setup_debugging(); - - while (not loop_ct) - { - { - // run event loop once - py::gil_scoped_acquire acquire; - loop.attr("stop")(); - loop.attr("run_forever")(); - } - - std::this_thread::yield(); - } - }, - std::move(setup_debugging)); - } -} - -AsyncOperatorHandler::~AsyncOperatorHandler() -{ - if (m_loop_thread.joinable()) - { - m_loop_ct = true; - m_loop_thread.join(); - } + pybind11::gil_scoped_acquire acquire; + m_asyncio = py::module_::import("asyncio"); } boost::fibers::future AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen) { py::gil_scoped_acquire acquire; + auto& ctx = runnable::Context::get_runtime_context().as(); + auto loop = ctx.get_asyncio_event_loop(); auto task = asyncgen.attr("__anext__")(); - auto future = m_asyncio.attr("run_coroutine_threadsafe")(task, m_loop); + auto future = m_asyncio.attr("run_coroutine_threadsafe")(task, loop); auto result = std::make_unique>(); auto result_future = result->get_future(); From 5df343f08c48b66282440fd44f0fe55a6ae8f371 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 25 Sep 2023 23:22:22 +0000 Subject: [PATCH 14/23] broken --- cpp/mrc/include/mrc/runnable/context.hpp | 11 +++++-- cpp/mrc/include/mrc/runnable/engine.hpp | 2 ++ .../include/mrc/runnable/launch_control.hpp | 30 +++++++++++-------- cpp/mrc/include/mrc/runnable/runner.hpp | 2 +- cpp/mrc/include/mrc/segment/context.hpp | 4 +-- cpp/mrc/src/internal/runnable/engine.cpp | 4 --- cpp/mrc/src/public/runnable/context.cpp | 30 +++++++++++++++---- cpp/mrc/src/public/runnable/runner.cpp | 2 +- python/mrc/_pymrc/include/pymrc/node.hpp | 3 +- python/mrc/_pymrc/src/node.cpp | 5 +++- python/mrc/_pymrc/src/operators.cpp | 5 ++++ 11 files changed, 68 insertions(+), 30 deletions(-) diff --git a/cpp/mrc/include/mrc/runnable/context.hpp b/cpp/mrc/include/mrc/runnable/context.hpp index 090804972..b988fbcae 100644 --- a/cpp/mrc/include/mrc/runnable/context.hpp +++ b/cpp/mrc/include/mrc/runnable/context.hpp @@ -17,6 +17,7 @@ #pragma once +#include "mrc/types.hpp" #include #include @@ -27,6 +28,7 @@ namespace mrc::runnable { class Runner; +class IEngine; enum class EngineType; /** @@ -41,7 +43,7 @@ class Context { public: Context() = delete; - Context(std::size_t rank, std::size_t size); + Context(const Runner& runner, IEngine& engine, std::size_t rank); virtual ~Context() = default; EngineType execution_context() const; @@ -54,6 +56,8 @@ class Context void barrier(); void yield(); + Future launch_task(std::function task); + const std::string& info() const; template @@ -69,7 +73,7 @@ class Context void set_exception(std::exception_ptr exception_ptr); protected: - void init(const Runner& runner); + void start(); bool status() const; void finish(); virtual void init_info(std::stringstream& ss); @@ -79,7 +83,8 @@ class Context std::size_t m_size; std::string m_info{"Uninitialized Context"}; std::exception_ptr m_exception_ptr{nullptr}; - const Runner* m_runner{nullptr}; + const Runner& m_runner; + IEngine& m_engine; virtual void do_lock() = 0; virtual void do_unlock() = 0; diff --git a/cpp/mrc/include/mrc/runnable/engine.hpp b/cpp/mrc/include/mrc/runnable/engine.hpp index 1c8f95212..73852fc18 100644 --- a/cpp/mrc/include/mrc/runnable/engine.hpp +++ b/cpp/mrc/include/mrc/runnable/engine.hpp @@ -22,6 +22,7 @@ #include "mrc/core/fiber_meta_data.hpp" #include "mrc/core/fiber_pool.hpp" #include "mrc/core/task_queue.hpp" +#include "mrc/runnable/context.hpp" #include "mrc/runnable/launch_options.hpp" #include "mrc/types.hpp" @@ -54,6 +55,7 @@ class IEngine virtual Future launch_task(std::function task) = 0; friend Runner; + friend Context; }; /** diff --git a/cpp/mrc/include/mrc/runnable/launch_control.hpp b/cpp/mrc/include/mrc/runnable/launch_control.hpp index 67371a218..de4b9bdf1 100644 --- a/cpp/mrc/include/mrc/runnable/launch_control.hpp +++ b/cpp/mrc/include/mrc/runnable/launch_control.hpp @@ -103,6 +103,9 @@ class LaunchControl final // engines are out way of running some task on the specified backend std::shared_ptr engines = build_engines(options); + // create runner + auto runner = runnable::make_runner(std::move(runnable)); + // make contexts std::vector> contexts; if constexpr (is_fiber_runnable_v) @@ -113,6 +116,7 @@ class LaunchControl final "ThreadEngine"; contexts = make_contexts>>( + *runner, *engines, std::forward(context_args)...); } @@ -123,6 +127,7 @@ class LaunchControl final "to be run on a " "FiberEngine"; contexts = make_contexts>>( + *runner, *engines, std::forward(context_args)...); } @@ -132,12 +137,14 @@ class LaunchControl final if (backend == EngineType::Fiber) { contexts = make_contexts>>( + *runner, *engines, std::forward(context_args)...); } else if (backend == EngineType::Thread) { contexts = make_contexts>>( + *runner, *engines, std::forward(context_args)...); } @@ -147,9 +154,6 @@ class LaunchControl final } } - // create runner - auto runner = runnable::make_runner(std::move(runnable)); - // construct the launcher return std::make_unique(std::move(runner), std::move(contexts), std::move(engines)); } @@ -204,6 +208,9 @@ class LaunchControl final // engines are out way of running some task on the specified backend std::shared_ptr engines = build_engines(options); + // create runner + auto runner = runnable::make_runner(std::move(runnable)); + // make contexts std::vector> contexts; if constexpr (is_fiber_runnable_v) @@ -212,7 +219,7 @@ class LaunchControl final "FiberRunnable to " "be run on a " "ThreadEngine"; - contexts = make_contexts(*engines, std::forward(context_args)...); + contexts = make_contexts(*runner, *engines, std::forward(context_args)...); } else if constexpr (is_thread_context_v) { @@ -220,19 +227,21 @@ class LaunchControl final "ThreadRunnable " "to be run on a " "FiberEngine"; - contexts = make_contexts(*engines, std::forward(context_args)...); + contexts = make_contexts(*runner, *engines, std::forward(context_args)...); } else { auto backend = get_engine_factory(options.engine_factory_name).backend(); if (backend == EngineType::Fiber) { - contexts = make_contexts>(*engines, + contexts = make_contexts>(*runner, + *engines, std::forward(context_args)...); } else if (backend == EngineType::Thread) { - contexts = make_contexts>(*engines, + contexts = make_contexts>(*runner, + *engines, std::forward(context_args)...); } else @@ -241,9 +250,6 @@ class LaunchControl final } } - // create runner - auto runner = runnable::make_runner(std::move(runnable)); - // construct the launcher return std::make_unique(std::move(runner), std::move(contexts), std::move(engines)); } @@ -325,14 +331,14 @@ class LaunchControl final * @return auto */ template - auto make_contexts(const IEngines& engines, ArgsT&&... args) + auto make_contexts(const Runner& runner, const IEngines& engines, ArgsT&&... args) { const auto size = engines.size(); std::vector> contexts; auto resources = std::make_shared(size); for (std::size_t i = 0; i < size; ++i) { - contexts.push_back(std::make_shared(resources, i, size, args...)); + contexts.push_back(std::make_shared(resources, runner, *engines.launchers()[i], i, args...)); } return std::move(contexts); } diff --git a/cpp/mrc/include/mrc/runnable/runner.hpp b/cpp/mrc/include/mrc/runnable/runner.hpp index c2539b601..c3cf12f9b 100644 --- a/cpp/mrc/include/mrc/runnable/runner.hpp +++ b/cpp/mrc/include/mrc/runnable/runner.hpp @@ -248,7 +248,7 @@ class SpecializedRunner : public Runner auto resources = std::make_shared(size); for (std::size_t i = 0; i < size; ++i) { - contexts.push_back(std::make_shared(resources, i, size, std::forward(args)...)); + contexts.push_back(std::make_shared(resources, i, std::forward(args)...)); } return std::move(contexts); } diff --git a/cpp/mrc/include/mrc/segment/context.hpp b/cpp/mrc/include/mrc/segment/context.hpp index 485f05d20..397572f0f 100644 --- a/cpp/mrc/include/mrc/segment/context.hpp +++ b/cpp/mrc/include/mrc/segment/context.hpp @@ -28,8 +28,8 @@ class Context : public ContextT { public: template - Context(std::size_t rank, std::size_t size, std::string name, ArgsT&&... args) : - ContextT(std::move(rank), std::move(size), std::forward(args)...), + Context(const mrc::runnable::Runner& runner, mrc::runnable::IEngine& engine, std::size_t rank, std::string name, ArgsT&&... args) : + ContextT(runner, engine, std::move(rank), std::forward(args)...), m_name(std::move(name)) { static_assert(std::is_base_of_v, "ContextT must derive from Context"); diff --git a/cpp/mrc/src/internal/runnable/engine.cpp b/cpp/mrc/src/internal/runnable/engine.cpp index 4ebe57491..ee12c9ba7 100644 --- a/cpp/mrc/src/internal/runnable/engine.cpp +++ b/cpp/mrc/src/internal/runnable/engine.cpp @@ -30,10 +30,6 @@ namespace mrc::runnable { Future Engine::launch_task(std::function task) { std::lock_guard lock(m_mutex); - if (m_launched) - { - LOG(FATAL) << "detected attempted reuse of a runnable::Engine; this is a fatal error"; - } m_launched = true; return do_launch_task(std::move(task)); } diff --git a/cpp/mrc/src/public/runnable/context.cpp b/cpp/mrc/src/public/runnable/context.cpp index 8e1aa510e..9143e1091 100644 --- a/cpp/mrc/src/public/runnable/context.cpp +++ b/cpp/mrc/src/public/runnable/context.cpp @@ -47,7 +47,12 @@ struct FiberLocalContext } // namespace -Context::Context(std::size_t rank, std::size_t size) : m_rank(rank), m_size(size) {} +Context::Context(const Runner& runner, IEngine& engine, std::size_t rank) : + m_runner(runner), + m_engine(engine), + m_rank(rank), + m_size(runner.instances().size()) +{} EngineType Context::execution_context() const { @@ -93,7 +98,24 @@ void Context::yield() do_yield(); } -void Context::init(const Runner& runner) +Future Context::launch_task(std::function task) +{ + boost::fibers::async([this, task = std::move(task)]() { + auto& fiber_local = FiberLocalContext::get(); + fiber_local.reset(new FiberLocalContext()); + fiber_local->m_context = this; + + try + { + task(); + } catch (...) + { + set_exception(std::current_exception()); + } + }); +} + +void Context::start() { auto& fiber_local = FiberLocalContext::get(); fiber_local.reset(new FiberLocalContext()); @@ -102,8 +124,6 @@ void Context::init(const Runner& runner) std::stringstream ss; this->init_info(ss); m_info = ss.str(); - - m_runner = &runner; } void Context::finish() @@ -127,7 +147,7 @@ void Context::set_exception(std::exception_ptr exception_ptr) if (m_exception_ptr == nullptr) { m_exception_ptr = std::move(std::current_exception()); - m_runner->kill(); + m_runner.kill(); } } } diff --git a/cpp/mrc/src/public/runnable/runner.cpp b/cpp/mrc/src/public/runnable/runner.cpp index 5545fa628..bd8aa5a74 100644 --- a/cpp/mrc/src/public/runnable/runner.cpp +++ b/cpp/mrc/src/public/runnable/runner.cpp @@ -130,7 +130,7 @@ void Runner::enqueue(std::shared_ptr launcher, std::vectorlaunch_task([this, context, &instance] { - context->init(*this); + context->start(); update_state(context->rank(), State::Running); instance.m_live_promise.set_value(); m_runnable->main(*context); diff --git a/python/mrc/_pymrc/include/pymrc/node.hpp b/python/mrc/_pymrc/include/pymrc/node.hpp index f785431e1..c61e78e0e 100644 --- a/python/mrc/_pymrc/include/pymrc/node.hpp +++ b/python/mrc/_pymrc/include/pymrc/node.hpp @@ -33,6 +33,7 @@ #include "mrc/node/rx_sink.hpp" #include "mrc/node/rx_source.hpp" #include "mrc/runnable/context.hpp" +#include "mrc/runnable/engine.hpp" #include #include @@ -333,7 +334,7 @@ class PythonNodeLoopHandle class PythonNodeContext : public mrc::runnable::Context { public: - PythonNodeContext(std::size_t rank, std::size_t size); + PythonNodeContext(const mrc::runnable::Runner& runner, mrc::runnable::IEngine& engine, std::size_t rank); ~PythonNodeContext() override; PyHolder get_asyncio_event_loop(); diff --git a/python/mrc/_pymrc/src/node.cpp b/python/mrc/_pymrc/src/node.cpp index e66a71cad..27e8ea1c5 100644 --- a/python/mrc/_pymrc/src/node.cpp +++ b/python/mrc/_pymrc/src/node.cpp @@ -78,7 +78,10 @@ PyHolder PythonNodeLoopHandle::get_asyncio_event_loop() return m_loop; } -PythonNodeContext::PythonNodeContext(std::size_t rank, std::size_t size) : mrc::runnable::Context(rank, size) +PythonNodeContext::PythonNodeContext(const mrc::runnable::Runner& runner, + mrc::runnable::IEngine& engine, + std::size_t rank) : + mrc::runnable::Context(runner, engine, rank) { if (m_loop_handle == nullptr) { diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index 93a6955f8..62dc14704 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -200,6 +200,11 @@ boost::fibers::future AsyncOperatorHandler::process_async_genera auto result_future = result->get_future(); + // runnable::Context::get_runtime_context().launch_task([](){ + // pybind11::gil_scoped_acquire acquire; + // pybind11::print("Hello from Context::launch_task"); + // }); + future.attr("add_done_callback")(py::cpp_function([result = std::move(result)](py::object future) { try { From 233421070dbe08a5f1a9966010dd14426fd8a7aa Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 26 Sep 2023 04:03:32 +0000 Subject: [PATCH 15/23] Context::launch_fiber --- cpp/mrc/include/mrc/runnable/context.hpp | 5 +++-- cpp/mrc/include/mrc/runnable/launch_control.hpp | 3 ++- cpp/mrc/include/mrc/segment/context.hpp | 9 +++++++-- cpp/mrc/src/public/runnable/context.cpp | 9 ++++----- python/mrc/_pymrc/include/pymrc/node.hpp | 5 ++++- python/mrc/_pymrc/src/node.cpp | 5 +++-- python/mrc/_pymrc/src/operators.cpp | 13 +++++++------ 7 files changed, 30 insertions(+), 19 deletions(-) diff --git a/cpp/mrc/include/mrc/runnable/context.hpp b/cpp/mrc/include/mrc/runnable/context.hpp index b988fbcae..daf3c7e29 100644 --- a/cpp/mrc/include/mrc/runnable/context.hpp +++ b/cpp/mrc/include/mrc/runnable/context.hpp @@ -18,6 +18,7 @@ #pragma once #include "mrc/types.hpp" + #include #include @@ -43,7 +44,7 @@ class Context { public: Context() = delete; - Context(const Runner& runner, IEngine& engine, std::size_t rank); + Context(const Runner& runner, IEngine& engine, std::size_t rank, std::size_t size); virtual ~Context() = default; EngineType execution_context() const; @@ -56,7 +57,7 @@ class Context void barrier(); void yield(); - Future launch_task(std::function task); + Future launch_fiber(std::function task); const std::string& info() const; diff --git a/cpp/mrc/include/mrc/runnable/launch_control.hpp b/cpp/mrc/include/mrc/runnable/launch_control.hpp index de4b9bdf1..2fddf3b13 100644 --- a/cpp/mrc/include/mrc/runnable/launch_control.hpp +++ b/cpp/mrc/include/mrc/runnable/launch_control.hpp @@ -338,7 +338,8 @@ class LaunchControl final auto resources = std::make_shared(size); for (std::size_t i = 0; i < size; ++i) { - contexts.push_back(std::make_shared(resources, runner, *engines.launchers()[i], i, args...)); + contexts.push_back( + std::make_shared(resources, runner, *engines.launchers()[i], i, size, args...)); } return std::move(contexts); } diff --git a/cpp/mrc/include/mrc/segment/context.hpp b/cpp/mrc/include/mrc/segment/context.hpp index 397572f0f..627cd793d 100644 --- a/cpp/mrc/include/mrc/segment/context.hpp +++ b/cpp/mrc/include/mrc/segment/context.hpp @@ -28,8 +28,13 @@ class Context : public ContextT { public: template - Context(const mrc::runnable::Runner& runner, mrc::runnable::IEngine& engine, std::size_t rank, std::string name, ArgsT&&... args) : - ContextT(runner, engine, std::move(rank), std::forward(args)...), + Context(const mrc::runnable::Runner& runner, + mrc::runnable::IEngine& engine, + std::size_t rank, + std::size_t size, + std::string name, + ArgsT&&... args) : + ContextT(runner, engine, std::move(rank), std::move(size), std::forward(args)...), m_name(std::move(name)) { static_assert(std::is_base_of_v, "ContextT must derive from Context"); diff --git a/cpp/mrc/src/public/runnable/context.cpp b/cpp/mrc/src/public/runnable/context.cpp index 9143e1091..24aed6b4e 100644 --- a/cpp/mrc/src/public/runnable/context.cpp +++ b/cpp/mrc/src/public/runnable/context.cpp @@ -47,11 +47,11 @@ struct FiberLocalContext } // namespace -Context::Context(const Runner& runner, IEngine& engine, std::size_t rank) : +Context::Context(const Runner& runner, IEngine& engine, std::size_t rank, std::size_t size) : m_runner(runner), m_engine(engine), m_rank(rank), - m_size(runner.instances().size()) + m_size(size) {} EngineType Context::execution_context() const @@ -98,13 +98,12 @@ void Context::yield() do_yield(); } -Future Context::launch_task(std::function task) +Future Context::launch_fiber(std::function task) { - boost::fibers::async([this, task = std::move(task)]() { + return boost::fibers::async([this, task]() { auto& fiber_local = FiberLocalContext::get(); fiber_local.reset(new FiberLocalContext()); fiber_local->m_context = this; - try { task(); diff --git a/python/mrc/_pymrc/include/pymrc/node.hpp b/python/mrc/_pymrc/include/pymrc/node.hpp index c61e78e0e..5d9cbc1b3 100644 --- a/python/mrc/_pymrc/include/pymrc/node.hpp +++ b/python/mrc/_pymrc/include/pymrc/node.hpp @@ -334,7 +334,10 @@ class PythonNodeLoopHandle class PythonNodeContext : public mrc::runnable::Context { public: - PythonNodeContext(const mrc::runnable::Runner& runner, mrc::runnable::IEngine& engine, std::size_t rank); + PythonNodeContext(const mrc::runnable::Runner& runner, + mrc::runnable::IEngine& engine, + std::size_t rank, + std::size_t size); ~PythonNodeContext() override; PyHolder get_asyncio_event_loop(); diff --git a/python/mrc/_pymrc/src/node.cpp b/python/mrc/_pymrc/src/node.cpp index 27e8ea1c5..753b38f61 100644 --- a/python/mrc/_pymrc/src/node.cpp +++ b/python/mrc/_pymrc/src/node.cpp @@ -80,8 +80,9 @@ PyHolder PythonNodeLoopHandle::get_asyncio_event_loop() PythonNodeContext::PythonNodeContext(const mrc::runnable::Runner& runner, mrc::runnable::IEngine& engine, - std::size_t rank) : - mrc::runnable::Context(runner, engine, rank) + std::size_t rank, + std::size_t size) : + mrc::runnable::Context(runner, engine, rank, size) { if (m_loop_handle == nullptr) { diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index 62dc14704..778ba32d6 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -22,6 +22,7 @@ #include "pymrc/types.hpp" #include "pymrc/utilities/acquire_gil.hpp" #include "pymrc/utilities/function_wrappers.hpp" + #include "mrc/runnable/context.hpp" #include @@ -192,18 +193,18 @@ boost::fibers::future AsyncOperatorHandler::process_async_genera { py::gil_scoped_acquire acquire; - auto& ctx = runnable::Context::get_runtime_context().as(); - auto loop = ctx.get_asyncio_event_loop(); + auto& ctx = runnable::Context::get_runtime_context().as(); + auto loop = ctx.get_asyncio_event_loop(); auto task = asyncgen.attr("__anext__")(); auto future = m_asyncio.attr("run_coroutine_threadsafe")(task, loop); auto result = std::make_unique>(); auto result_future = result->get_future(); - // runnable::Context::get_runtime_context().launch_task([](){ - // pybind11::gil_scoped_acquire acquire; - // pybind11::print("Hello from Context::launch_task"); - // }); + runnable::Context::get_runtime_context().launch_fiber([]() { + pybind11::gil_scoped_acquire acquire; + pybind11::print("Hello from Context::launch_Fiber"); + }); future.attr("add_done_callback")(py::cpp_function([result = std::move(result)](py::object future) { try From ac6eaba533dc16294ea0cb78b2fd447fa0ce4968 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 26 Sep 2023 04:51:52 +0000 Subject: [PATCH 16/23] flat_map_async --- python/mrc/_pymrc/include/pymrc/operators.hpp | 8 +- python/mrc/_pymrc/src/operators.cpp | 89 +++++++++++-------- python/mrc/core/operators.cpp | 2 +- 3 files changed, 59 insertions(+), 40 deletions(-) diff --git a/python/mrc/_pymrc/include/pymrc/operators.hpp b/python/mrc/_pymrc/include/pymrc/operators.hpp index 62907e713..a01dec31d 100644 --- a/python/mrc/_pymrc/include/pymrc/operators.hpp +++ b/python/mrc/_pymrc/include/pymrc/operators.hpp @@ -63,10 +63,14 @@ class AsyncOperatorHandler AsyncOperatorHandler(); ~AsyncOperatorHandler() = default; - boost::fibers::future process_async_generator(PyObjectHolder asyncgen); + void process_async_generator(PyObjectHolder asyncgen, PyObjectSubscriber sink); + + void wait_completed() const; private: + boost::fibers::future future_from_async_generator(PyObjectHolder asyncgen); pybind11::module_ m_asyncio; + std::atomic m_outstanding; }; class OperatorsProxy @@ -75,7 +79,7 @@ class OperatorsProxy static PythonOperator build(PyFuncHolder build_fn); static PythonOperator filter(PyFuncHolder filter_fn); static PythonOperator flatten(); - static PythonOperator concat_map_async(PyFuncHolder flatmap_fn); + static PythonOperator flat_map_async(PyFuncHolder flatmap_fn); static PythonOperator map(OnDataFunction map_fn); static PythonOperator on_completed(PyFuncHolder()> finally_fn); static PythonOperator pairwise(); diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index 778ba32d6..90c644cee 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -23,6 +23,7 @@ #include "pymrc/utilities/acquire_gil.hpp" #include "pymrc/utilities/function_wrappers.hpp" +#include "mrc/core/utils.hpp" #include "mrc/runnable/context.hpp" #include @@ -189,7 +190,52 @@ AsyncOperatorHandler::AsyncOperatorHandler() m_asyncio = py::module_::import("asyncio"); } -boost::fibers::future AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen) +void AsyncOperatorHandler::wait_completed() const +{ + while (m_outstanding > 0) + { + boost::this_fiber::yield(); + } +} + +void AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen, PyObjectSubscriber sink) +{ + ++m_outstanding; + runnable::Context::get_runtime_context().launch_fiber([this, sink, asyncgen]() { + while (sink.is_subscribed()) + { + using namespace std::chrono_literals; + + auto gen_yielded = this->future_from_async_generator(asyncgen); + + while (gen_yielded.wait_for(0s) != boost::fibers::future_status::ready) + { + boost::this_fiber::yield(); + + if (not sink.is_subscribed()) + { + return; + } + } + + try + { + sink.on_next(gen_yielded.get()); + } catch (pybind11::error_already_set& ex) + { + if (ex.matches(PyExc_StopAsyncIteration)) + { + --m_outstanding; + return; + } + sink.on_error(std::current_exception()); + --m_outstanding; + } + } + }); +} + +boost::fibers::future AsyncOperatorHandler::future_from_async_generator(PyObjectHolder asyncgen) { py::gil_scoped_acquire acquire; @@ -201,11 +247,6 @@ boost::fibers::future AsyncOperatorHandler::process_async_genera auto result_future = result->get_future(); - runnable::Context::get_runtime_context().launch_fiber([]() { - pybind11::gil_scoped_acquire acquire; - pybind11::print("Hello from Context::launch_Fiber"); - }); - future.attr("add_done_callback")(py::cpp_function([result = std::move(result)](py::object future) { try { @@ -222,10 +263,10 @@ boost::fibers::future AsyncOperatorHandler::process_async_genera return result_future; } -PythonOperator OperatorsProxy::concat_map_async(PyFuncHolder flatmap_fn) +PythonOperator OperatorsProxy::flat_map_async(PyFuncHolder flatmap_fn) { // Build and return the map operator - return {"concat_map_async", [=](PyObjectObservable source) { + return {"flat_map_async", [=](PyObjectObservable source) { return rxcpp::observable<>::create([=](PyObjectSubscriber sink) { auto async_handler = std::make_unique(); source.subscribe( @@ -234,42 +275,16 @@ PythonOperator OperatorsProxy::concat_map_async(PyFuncHolder(); auto asyncgen = flatmap_fn(std::move(value)); acquire.reset(); - while (sink.is_subscribed()) - { - using namespace std::chrono_literals; - - auto gen_yielded = async_handler.process_async_generator(asyncgen); - - while (gen_yielded.wait_for(0s) != boost::fibers::future_status::ready) - { - boost::this_fiber::yield(); - - if (not sink.is_subscribed()) - { - return; - } - } - - try - { - sink.on_next(gen_yielded.get()); - } catch (pybind11::error_already_set& ex) - { - if (ex.matches(PyExc_StopAsyncIteration)) - { - return; - } - - sink.on_error(std::current_exception()); - } - } + async_handler.process_async_generator(asyncgen, sink); }, [sink, &async_handler = *async_handler](std::exception_ptr ex) { // Forward + async_handler.wait_completed(); sink.on_error(std::current_exception()); }, [sink, &async_handler = *async_handler]() { // Forward + async_handler.wait_completed(); sink.on_completed(); }); }); diff --git a/python/mrc/core/operators.cpp b/python/mrc/core/operators.cpp index 7a3d3de89..411059131 100644 --- a/python/mrc/core/operators.cpp +++ b/python/mrc/core/operators.cpp @@ -55,7 +55,7 @@ PYBIND11_MODULE(operators, py_mod) py_mod.def("build", &OperatorsProxy::build); py_mod.def("filter", &OperatorsProxy::filter); py_mod.def("flatten", &OperatorsProxy::flatten); - py_mod.def("concat_map_async", &OperatorsProxy::concat_map_async); + py_mod.def("flat_map_async", &OperatorsProxy::flat_map_async); py_mod.def("map", &OperatorsProxy::map); py_mod.def("on_completed", &OperatorsProxy::on_completed); py_mod.def("pairwise", &OperatorsProxy::pairwise); From 36a74f880d50af28c0d9ea649f60db311eb6d6bc Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 26 Sep 2023 14:16:35 +0000 Subject: [PATCH 17/23] asyncgen loop shutdown --- python/mrc/_pymrc/include/pymrc/node.hpp | 2 +- python/mrc/_pymrc/include/pymrc/operators.hpp | 4 +++- python/mrc/_pymrc/src/node.cpp | 5 +++++ python/mrc/_pymrc/src/operators.cpp | 17 ++++++++++++++--- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/python/mrc/_pymrc/include/pymrc/node.hpp b/python/mrc/_pymrc/include/pymrc/node.hpp index 5d9cbc1b3..a112bded7 100644 --- a/python/mrc/_pymrc/include/pymrc/node.hpp +++ b/python/mrc/_pymrc/include/pymrc/node.hpp @@ -343,7 +343,7 @@ class PythonNodeContext : public mrc::runnable::Context PyHolder get_asyncio_event_loop(); private: - // this was intended to be a thread-specific-pointer, but I couldn't get boost::thread to link. + // TODO(cwharris): this should be a thread-specific pointer, std::unique_ptr m_loop_handle; }; diff --git a/python/mrc/_pymrc/include/pymrc/operators.hpp b/python/mrc/_pymrc/include/pymrc/operators.hpp index a01dec31d..55c88d514 100644 --- a/python/mrc/_pymrc/include/pymrc/operators.hpp +++ b/python/mrc/_pymrc/include/pymrc/operators.hpp @@ -66,11 +66,13 @@ class AsyncOperatorHandler void process_async_generator(PyObjectHolder asyncgen, PyObjectSubscriber sink); void wait_completed() const; + void wait_error(); private: boost::fibers::future future_from_async_generator(PyObjectHolder asyncgen); pybind11::module_ m_asyncio; - std::atomic m_outstanding; + uint32_t m_outstanding = 0; + bool m_cancelled = false; }; class OperatorsProxy diff --git a/python/mrc/_pymrc/src/node.cpp b/python/mrc/_pymrc/src/node.cpp index 753b38f61..8f0c63773 100644 --- a/python/mrc/_pymrc/src/node.cpp +++ b/python/mrc/_pymrc/src/node.cpp @@ -51,6 +51,11 @@ PythonNodeLoopHandle::PythonNodeLoopHandle() std::this_thread::yield(); } + + pybind11::gil_scoped_acquire acquire; + auto shutdown = loop.attr("shutdown_asyncgens")(); + loop.attr("run_until_complete")(shutdown); + loop.attr("close")(); }); } diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index 90c644cee..722584af6 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -198,10 +198,23 @@ void AsyncOperatorHandler::wait_completed() const } } +void AsyncOperatorHandler::wait_error() +{ + m_cancelled = true; + wait_completed(); +} + void AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen, PyObjectSubscriber sink) { + if (m_cancelled) + { + return; // do we need to + } ++m_outstanding; runnable::Context::get_runtime_context().launch_fiber([this, sink, asyncgen]() { + auto unwinder = Unwinder([this]() { + --m_outstanding; + }); while (sink.is_subscribed()) { using namespace std::chrono_literals; @@ -225,11 +238,9 @@ void AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen, PyOb { if (ex.matches(PyExc_StopAsyncIteration)) { - --m_outstanding; return; } sink.on_error(std::current_exception()); - --m_outstanding; } } }); @@ -279,7 +290,7 @@ PythonOperator OperatorsProxy::flat_map_async(PyFuncHolder Date: Tue, 26 Sep 2023 15:55:38 +0000 Subject: [PATCH 18/23] flat_map_async test --- python/tests/test_operators.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/python/tests/test_operators.py b/python/tests/test_operators.py index 9fa939b3a..dc98b6e5f 100644 --- a/python/tests/test_operators.py +++ b/python/tests/test_operators.py @@ -130,6 +130,37 @@ def node_fn(input: mrc.Observable, output: mrc.Subscriber): assert actual == expected +def test_flat_map_async(run_segment): + + input_data = [('a', 5), ('b', 1), ('c', 3)] + expected = [ + ('a', 0), ('a', 1), ('a', 2), ('a', 3), ('a', 4), ('b', 0), ('c', 0), ('c', 1), ('c', 2) + ] + + import random + random.shuffle(input_data) # the output order is not dictated by the input order + + async def generate(value): + name, count = value + for i in range(count): + yield (name, i) + + def node_fn(input: mrc.Observable, output: mrc.Subscriber): + input.pipe(ops.flat_map_async(generate)).subscribe(output) + + actual, raised_error = run_segment(input_data, node_fn) + + assert set(actual) == set(expected) + + def assert_sequential(name, actual): + # the output of individual generators must be sequential + import itertools + for i, (name, value) in zip(itertools.count(), filter(lambda pair: pair[0] == name, actual)): + assert i == value + + assert_sequential('a', actual) + assert_sequential('b', actual) + assert_sequential('c', actual) def test_filter(run_segment): From e74838e9ab661b78f28705ae28e87dec929f3baf Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 26 Sep 2023 15:58:13 +0000 Subject: [PATCH 19/23] fix styles --- cpp/mrc/include/mrc/runnable/detail/type_traits.hpp | 2 +- cpp/mrc/include/mrc/segment/builder.hpp | 4 ++-- python/mrc/_pymrc/include/pymrc/operators.hpp | 2 +- python/tests/test_operators.py | 8 ++++---- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cpp/mrc/include/mrc/runnable/detail/type_traits.hpp b/cpp/mrc/include/mrc/runnable/detail/type_traits.hpp index 0e6dab75a..57444696f 100644 --- a/cpp/mrc/include/mrc/runnable/detail/type_traits.hpp +++ b/cpp/mrc/include/mrc/runnable/detail/type_traits.hpp @@ -85,7 +85,7 @@ static auto unwrap_context(l4_concept c, ThreadContext& t) return std::make_pair(self{}, self{}); } -template +template static auto unwrap_context(l2_concept c, T& t) { return std::make_pair(self{}, self{}); diff --git a/cpp/mrc/include/mrc/segment/builder.hpp b/cpp/mrc/include/mrc/segment/builder.hpp index d03ce8b04..b1db230e9 100644 --- a/cpp/mrc/include/mrc/segment/builder.hpp +++ b/cpp/mrc/include/mrc/segment/builder.hpp @@ -278,7 +278,7 @@ class IBuilder typename... ArgsT> auto make_node(std::string name, ArgsT&&... ops); - template + template auto make_node_explicit(std::string name, ArgsT&&... ops); /** @@ -439,7 +439,7 @@ auto IBuilder::make_node(std::string name, ArgsT&&... ops) return construct_object>(name, std::forward(ops)...); } -template +template auto IBuilder::make_node_explicit(std::string name, ArgsT&&... ops) { return construct_object(name, std::forward(ops)...); diff --git a/python/mrc/_pymrc/include/pymrc/operators.hpp b/python/mrc/_pymrc/include/pymrc/operators.hpp index 55c88d514..ae66a21a0 100644 --- a/python/mrc/_pymrc/include/pymrc/operators.hpp +++ b/python/mrc/_pymrc/include/pymrc/operators.hpp @@ -72,7 +72,7 @@ class AsyncOperatorHandler boost::fibers::future future_from_async_generator(PyObjectHolder asyncgen); pybind11::module_ m_asyncio; uint32_t m_outstanding = 0; - bool m_cancelled = false; + bool m_cancelled = false; }; class OperatorsProxy diff --git a/python/tests/test_operators.py b/python/tests/test_operators.py index dc98b6e5f..b0dbe309f 100644 --- a/python/tests/test_operators.py +++ b/python/tests/test_operators.py @@ -130,15 +130,14 @@ def node_fn(input: mrc.Observable, output: mrc.Subscriber): assert actual == expected + def test_flat_map_async(run_segment): input_data = [('a', 5), ('b', 1), ('c', 3)] - expected = [ - ('a', 0), ('a', 1), ('a', 2), ('a', 3), ('a', 4), ('b', 0), ('c', 0), ('c', 1), ('c', 2) - ] + expected = [('a', 0), ('a', 1), ('a', 2), ('a', 3), ('a', 4), ('b', 0), ('c', 0), ('c', 1), ('c', 2)] import random - random.shuffle(input_data) # the output order is not dictated by the input order + random.shuffle(input_data) # the output order is not dictated by the input order async def generate(value): name, count = value @@ -162,6 +161,7 @@ def assert_sequential(name, actual): assert_sequential('b', actual) assert_sequential('c', actual) + def test_filter(run_segment): input_data = [1, 2, 3, 4, 5, "one", "two", "three", "four", "five", 1, "two", 3] From a7cd4af11f629a94609d6b7f1d07361cece4d5a9 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 26 Sep 2023 17:32:43 +0000 Subject: [PATCH 20/23] iwyu fixes --- cpp/mrc/include/mrc/runnable/context.hpp | 15 ++++++++------- cpp/mrc/src/internal/runnable/engine.cpp | 9 +++------ cpp/mrc/src/public/runnable/context.cpp | 17 +++++++++-------- python/mrc/_pymrc/include/pymrc/node.hpp | 10 +++++++++- python/mrc/_pymrc/src/node.cpp | 16 +++++++++++++--- python/mrc/_pymrc/src/operators.cpp | 12 ++++-------- python/mrc/_pymrc/src/segment.cpp | 5 ----- 7 files changed, 46 insertions(+), 38 deletions(-) diff --git a/cpp/mrc/include/mrc/runnable/context.hpp b/cpp/mrc/include/mrc/runnable/context.hpp index daf3c7e29..b26e6f8ed 100644 --- a/cpp/mrc/include/mrc/runnable/context.hpp +++ b/cpp/mrc/include/mrc/runnable/context.hpp @@ -17,19 +17,20 @@ #pragma once -#include "mrc/types.hpp" +#include "mrc/types.hpp" // for Future -#include +#include // for CHECK, COMPACT_GOOGLE_LOG_FATAL, LogMessag... -#include -#include -#include -#include +#include // for size_t +#include // for exception_ptr +#include // for function +#include // for stringstream +#include // for allocator, string namespace mrc::runnable { -class Runner; class IEngine; +class Runner; enum class EngineType; /** diff --git a/cpp/mrc/src/internal/runnable/engine.cpp b/cpp/mrc/src/internal/runnable/engine.cpp index ee12c9ba7..fbd6c6e31 100644 --- a/cpp/mrc/src/internal/runnable/engine.cpp +++ b/cpp/mrc/src/internal/runnable/engine.cpp @@ -17,13 +17,10 @@ #include "internal/runnable/engine.hpp" -#include "mrc/types.hpp" +#include "mrc/types.hpp" // for Future -#include - -#include -#include -#include +#include // for mutex, lock_guard +#include // for move namespace mrc::runnable { diff --git a/cpp/mrc/src/public/runnable/context.cpp b/cpp/mrc/src/public/runnable/context.cpp index 24aed6b4e..a1edfd3cc 100644 --- a/cpp/mrc/src/public/runnable/context.cpp +++ b/cpp/mrc/src/public/runnable/context.cpp @@ -17,16 +17,17 @@ #include "mrc/runnable/context.hpp" -#include "mrc/runnable/runner.hpp" +#include "mrc/runnable/runner.hpp" // for Runner -#include -#include +#include // for fiber_specific_ptr +#include // for async +#include // for COMPACT_GOOGLE_LOG_FATAL -#include -#include -#include -#include -#include +#include // for size_t +#include // for exception_ptr, current_excep... +#include // for operator<<, basic_ostream +#include // for char_traits, operator<<, string +#include // for move namespace mrc::runnable { diff --git a/python/mrc/_pymrc/include/pymrc/node.hpp b/python/mrc/_pymrc/include/pymrc/node.hpp index a112bded7..5b590b1f0 100644 --- a/python/mrc/_pymrc/include/pymrc/node.hpp +++ b/python/mrc/_pymrc/include/pymrc/node.hpp @@ -33,17 +33,19 @@ #include "mrc/node/rx_sink.hpp" #include "mrc/node/rx_source.hpp" #include "mrc/runnable/context.hpp" -#include "mrc/runnable/engine.hpp" +#include "mrc/runnable/runner.hpp" // for Runner #include #include #include #include +#include // for uint32_t #include #include #include #include +#include // for thread #include // Avoid forward declaring template specialization base classes @@ -52,6 +54,12 @@ namespace mrc { +namespace runnable { + +class IEngine; + +} + namespace edge { template diff --git a/python/mrc/_pymrc/src/node.cpp b/python/mrc/_pymrc/src/node.cpp index 8f0c63773..210293016 100644 --- a/python/mrc/_pymrc/src/node.cpp +++ b/python/mrc/_pymrc/src/node.cpp @@ -20,12 +20,20 @@ #include "pymrc/executor.hpp" #include +#include // for module_ #include -#include #include -namespace mrc::pymrc { +namespace mrc { + +namespace runnable { + +class IEngine; + +} + +namespace pymrc { PythonNodeLoopHandle::PythonNodeLoopHandle() { @@ -110,4 +118,6 @@ PyHolder PythonNodeContext::get_asyncio_event_loop() return m_loop_handle->get_asyncio_event_loop(); } -} // namespace mrc::pymrc +} // namespace pymrc + +} // namespace mrc diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index 722584af6..82cb3a123 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -17,7 +17,6 @@ #include "pymrc/operators.hpp" -#include "pymrc/executor.hpp" #include "pymrc/node.hpp" #include "pymrc/types.hpp" #include "pymrc/utilities/acquire_gil.hpp" @@ -26,10 +25,9 @@ #include "mrc/core/utils.hpp" #include "mrc/runnable/context.hpp" -#include -#include -#include #include +#include +#include #include #include #include @@ -40,11 +38,9 @@ #include #include +#include #include -#include -#include -#include -#include +#include #include #include #include diff --git a/python/mrc/_pymrc/src/segment.cpp b/python/mrc/_pymrc/src/segment.cpp index 5957bbe72..9c614d85b 100644 --- a/python/mrc/_pymrc/src/segment.cpp +++ b/python/mrc/_pymrc/src/segment.cpp @@ -28,12 +28,9 @@ #include "mrc/channel/status.hpp" #include "mrc/edge/edge_builder.hpp" #include "mrc/node/port_registry.hpp" -#include "mrc/node/rx_sink_base.hpp" -#include "mrc/node/rx_source_base.hpp" #include "mrc/runnable/context.hpp" #include "mrc/segment/builder.hpp" #include "mrc/segment/object.hpp" -#include "mrc/types.hpp" #include #include @@ -44,7 +41,6 @@ #include #include #include -#include #include #include #include @@ -52,7 +48,6 @@ #include #include #include -#include // IWYU thinks we need array for py::print // IWYU pragma: no_include From 21becda31bc17c589a93c42faccfcb369cdba9b2 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 26 Sep 2023 19:57:07 +0000 Subject: [PATCH 21/23] iwyu fixes --- python/mrc/_pymrc/include/pymrc/operators.hpp | 4 ++-- python/mrc/core/operators.cpp | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/python/mrc/_pymrc/include/pymrc/operators.hpp b/python/mrc/_pymrc/include/pymrc/operators.hpp index ae66a21a0..5914b5515 100644 --- a/python/mrc/_pymrc/include/pymrc/operators.hpp +++ b/python/mrc/_pymrc/include/pymrc/operators.hpp @@ -19,9 +19,9 @@ #include "pymrc/types.hpp" -#include +#include // for module_ -#include +#include // for uint32_t #include #include diff --git a/python/mrc/core/operators.cpp b/python/mrc/core/operators.cpp index 411059131..eff4c6d3e 100644 --- a/python/mrc/core/operators.cpp +++ b/python/mrc/core/operators.cpp @@ -28,7 +28,6 @@ #include #include // IWYU pragma: keep -#include #include namespace mrc::pymrc { From 6a170da8eeca54f051240776c6b77fbd89164842 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 26 Sep 2023 21:12:13 +0000 Subject: [PATCH 22/23] map_async --- python/mrc/_pymrc/include/pymrc/operators.hpp | 4 +- python/mrc/_pymrc/src/operators.cpp | 94 ++++++++++++++++--- python/mrc/core/operators.cpp | 1 + python/tests/test_operators.py | 16 ++++ 4 files changed, 101 insertions(+), 14 deletions(-) diff --git a/python/mrc/_pymrc/include/pymrc/operators.hpp b/python/mrc/_pymrc/include/pymrc/operators.hpp index 5914b5515..50d87297d 100644 --- a/python/mrc/_pymrc/include/pymrc/operators.hpp +++ b/python/mrc/_pymrc/include/pymrc/operators.hpp @@ -63,13 +63,14 @@ class AsyncOperatorHandler AsyncOperatorHandler(); ~AsyncOperatorHandler() = default; + void process_async_task(PyObjectHolder task, PyObjectSubscriber sink); void process_async_generator(PyObjectHolder asyncgen, PyObjectSubscriber sink); void wait_completed() const; void wait_error(); private: - boost::fibers::future future_from_async_generator(PyObjectHolder asyncgen); + boost::fibers::future future_from_async_task(PyObjectHolder task); pybind11::module_ m_asyncio; uint32_t m_outstanding = 0; bool m_cancelled = false; @@ -82,6 +83,7 @@ class OperatorsProxy static PythonOperator filter(PyFuncHolder filter_fn); static PythonOperator flatten(); static PythonOperator flat_map_async(PyFuncHolder flatmap_fn); + static PythonOperator map_async(PyFuncHolder flatmap_fn); static PythonOperator map(OnDataFunction map_fn); static PythonOperator on_completed(PyFuncHolder()> finally_fn); static PythonOperator pairwise(); diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index 82cb3a123..c65559944 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -200,13 +200,53 @@ void AsyncOperatorHandler::wait_error() wait_completed(); } +void AsyncOperatorHandler::process_async_task(PyObjectHolder task, PyObjectSubscriber sink) +{ + if (m_cancelled) + { + return; + } + + ++m_outstanding; + + runnable::Context::get_runtime_context().launch_fiber([this, sink, task]() { + auto unwinder = Unwinder([this]() { + --m_outstanding; + }); + + using namespace std::chrono_literals; + + auto yielded = this->future_from_async_task(task); + + while (yielded.wait_for(0s) != boost::fibers::future_status::ready) + { + boost::this_fiber::yield(); + } + + if (not sink.is_subscribed()) + { + return; + } + + try + { + sink.on_next(yielded.get()); + } catch (pybind11::error_already_set& ex) + { + sink.on_error(std::current_exception()); + } + }); +} + void AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen, PyObjectSubscriber sink) { if (m_cancelled) { - return; // do we need to + return; } + ++m_outstanding; + runnable::Context::get_runtime_context().launch_fiber([this, sink, asyncgen]() { auto unwinder = Unwinder([this]() { --m_outstanding; @@ -215,9 +255,12 @@ void AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen, PyOb { using namespace std::chrono_literals; - auto gen_yielded = this->future_from_async_generator(asyncgen); + auto gil = std::make_unique(); + PyObjectHolder task = asyncgen.attr("__anext__")(); + gil.reset(); + auto yielded = this->future_from_async_task(task); - while (gen_yielded.wait_for(0s) != boost::fibers::future_status::ready) + while (yielded.wait_for(0s) != boost::fibers::future_status::ready) { boost::this_fiber::yield(); @@ -229,7 +272,7 @@ void AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen, PyOb try { - sink.on_next(gen_yielded.get()); + sink.on_next(yielded.get()); } catch (pybind11::error_already_set& ex) { if (ex.matches(PyExc_StopAsyncIteration)) @@ -242,19 +285,18 @@ void AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen, PyOb }); } -boost::fibers::future AsyncOperatorHandler::future_from_async_generator(PyObjectHolder asyncgen) +boost::fibers::future AsyncOperatorHandler::future_from_async_task(PyObjectHolder task) { py::gil_scoped_acquire acquire; - auto& ctx = runnable::Context::get_runtime_context().as(); - auto loop = ctx.get_asyncio_event_loop(); - auto task = asyncgen.attr("__anext__")(); - auto future = m_asyncio.attr("run_coroutine_threadsafe")(task, loop); - auto result = std::make_unique>(); + auto& ctx = runnable::Context::get_runtime_context().as(); + auto loop = ctx.get_asyncio_event_loop(); + auto future = m_asyncio.attr("run_coroutine_threadsafe")(task, loop); + auto promise = std::make_unique>(); - auto result_future = result->get_future(); + auto result_future = promise->get_future(); - future.attr("add_done_callback")(py::cpp_function([result = std::move(result)](py::object future) { + future.attr("add_done_callback")(py::cpp_function([result = std::move(promise)](py::object future) { try { auto acquire = std::make_unique(); @@ -272,7 +314,6 @@ boost::fibers::future AsyncOperatorHandler::future_from_async_ge PythonOperator OperatorsProxy::flat_map_async(PyFuncHolder flatmap_fn) { - // Build and return the map operator return {"flat_map_async", [=](PyObjectObservable source) { return rxcpp::observable<>::create([=](PyObjectSubscriber sink) { auto async_handler = std::make_unique(); @@ -298,6 +339,33 @@ PythonOperator OperatorsProxy::flat_map_async(PyFuncHolder flatmap_fn) +{ + return {"map_async", [=](PyObjectObservable source) { + return rxcpp::observable<>::create([=](PyObjectSubscriber sink) { + auto async_handler = std::make_unique(); + source.subscribe( + sink, + [sink, flatmap_fn, &async_handler = *async_handler](PyHolder value) { + auto acquire = std::make_unique(); + auto task = flatmap_fn(std::move(value)); + acquire.reset(); + async_handler.process_async_task(task, sink); + }, + [sink, &async_handler = *async_handler](std::exception_ptr ex) { + // Forward + async_handler.wait_error(); + sink.on_error(std::current_exception()); + }, + [sink, &async_handler = *async_handler]() { + // Forward + async_handler.wait_completed(); + sink.on_completed(); + }); + }); + }}; +} + PythonOperator OperatorsProxy::map(OnDataFunction map_fn) { // Build and return the map operator diff --git a/python/mrc/core/operators.cpp b/python/mrc/core/operators.cpp index eff4c6d3e..4d41269da 100644 --- a/python/mrc/core/operators.cpp +++ b/python/mrc/core/operators.cpp @@ -56,6 +56,7 @@ PYBIND11_MODULE(operators, py_mod) py_mod.def("flatten", &OperatorsProxy::flatten); py_mod.def("flat_map_async", &OperatorsProxy::flat_map_async); py_mod.def("map", &OperatorsProxy::map); + py_mod.def("map_async", &OperatorsProxy::map_async); py_mod.def("on_completed", &OperatorsProxy::on_completed); py_mod.def("pairwise", &OperatorsProxy::pairwise); py_mod.def("to_list", &OperatorsProxy::to_list); diff --git a/python/tests/test_operators.py b/python/tests/test_operators.py index b0dbe309f..95367ecdb 100644 --- a/python/tests/test_operators.py +++ b/python/tests/test_operators.py @@ -130,6 +130,22 @@ def node_fn(input: mrc.Observable, output: mrc.Subscriber): assert actual == expected +def test_map_async(run_segment): + + input_data = [0, 1, 2, 3] + expected = [0, 1, 4, 9] + + async def square_async(value): + import asyncio + await asyncio.sleep(0) + return value * value + + def node_fn(input: mrc.Observable, output: mrc.Subscriber): + input.pipe(ops.map_async(square_async)).subscribe(output) + + actual, raised_error = run_segment(input_data, node_fn) + + assert set(actual) == set(expected) def test_flat_map_async(run_segment): From e072c638499f92c60ab1be398dd00e636602a68f Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 26 Sep 2023 21:24:00 +0000 Subject: [PATCH 23/23] fix styles --- python/mrc/_pymrc/src/operators.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/mrc/_pymrc/src/operators.cpp b/python/mrc/_pymrc/src/operators.cpp index c65559944..52054ed45 100644 --- a/python/mrc/_pymrc/src/operators.cpp +++ b/python/mrc/_pymrc/src/operators.cpp @@ -255,10 +255,10 @@ void AsyncOperatorHandler::process_async_generator(PyObjectHolder asyncgen, PyOb { using namespace std::chrono_literals; - auto gil = std::make_unique(); + auto gil = std::make_unique(); PyObjectHolder task = asyncgen.attr("__anext__")(); gil.reset(); - auto yielded = this->future_from_async_task(task); + auto yielded = this->future_from_async_task(task); while (yielded.wait_for(0s) != boost::fibers::future_status::ready) {