-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve the HTTP server shutdown logic to respond correctly to Ctrl+C and stop commands #1517
base: branch-24.06
Are you sure you want to change the base?
Improve the HTTP server shutdown logic to respond correctly to Ctrl+C and stop commands #1517
Conversation
class HttpServerSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>> | ||
class HttpServerSourceStage : public PythonRunnableSource<std::shared_ptr<ControlMessage>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we aren't completely on ControlMessage
yet, can we make the output message a template choice?
@@ -68,7 +70,9 @@ using parse_status_t = std::tuple<unsigned /*http status code*/, | |||
* Refer to https://www.boost.org/doc/libs/1_74_0/libs/system/doc/html/system.html#ref_class_error_code for more | |||
* information regarding `boost::system::error_code`. | |||
*/ | |||
using payload_parse_fn_t = std::function<parse_status_t(const std::string& /* post body */)>; | |||
using payload_parse_fn_t = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the docstring to reflect the change here
@@ -121,18 +123,23 @@ class Session : public std::enable_shared_from_this<Session> | |||
|
|||
// Release ownership of the parsed message and move it into the | |||
// handle_request method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move the comment down just above the handle_request
call, otherwise it looks like its related to building up that address_str
// HttpServerSourceStage::subscriber_fn_t HttpServerSourceStage::build() | ||
// { | ||
// return [this](rxcpp::subscriber<source_type_t> subscriber) -> void { | ||
// try | ||
// { | ||
// m_server->start(); | ||
// this->source_generator(subscriber); | ||
// } catch (const SourceStageStopAfter& e) | ||
// { | ||
// DLOG(INFO) << "Completed after emitting " << m_records_emitted << " records"; | ||
// } catch (const std::exception& e) | ||
// { | ||
// LOG(ERROR) << "Encountered error while listening for incoming HTTP requests: " << e.what() << std::endl; | ||
// subscriber.on_error(std::make_exception_ptr(e)); | ||
// return; | ||
// } | ||
// subscriber.on_completed(); | ||
// this->close(); | ||
// }; | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
// void HttpServerSourceStage::source_generator(rxcpp::subscriber<HttpServerSourceStage::source_type_t> subscriber) | ||
// { | ||
// // only check if the server is running when the queue is empty, allowing all queued messages to be processed | ||
// prior | ||
// // to shutting down | ||
// bool server_running = true; | ||
// bool queue_closed = false; | ||
// while (subscriber.is_subscribed() && server_running && !queue_closed) | ||
// { | ||
// table_t table_ptr{nullptr}; | ||
|
||
// auto queue_status = m_queue.pop_wait_for(table_ptr, std::chrono::milliseconds(100)); | ||
|
||
// if (queue_status == boost::fibers::channel_op_status::success) | ||
// { | ||
// // NOLINTNEXTLINE(clang-diagnostic-unused-value) | ||
// DCHECK_NOTNULL(table_ptr); | ||
// try | ||
// { | ||
// auto message = MessageMeta::create_from_cpp(std::move(*table_ptr), 0); | ||
// auto num_records = message->count(); | ||
// subscriber.on_next(std::move(message)); | ||
// m_records_emitted += num_records; | ||
// } catch (const std::exception& e) | ||
// { | ||
// LOG(ERROR) << "Error occurred converting HTTP payload to Dataframe: " << e.what(); | ||
// } | ||
|
||
// if (m_stop_after > 0 && m_records_emitted >= m_stop_after) | ||
// { | ||
// throw SourceStageStopAfter(); | ||
// } | ||
// } | ||
// else if (queue_status == boost::fibers::channel_op_status::timeout) | ||
// { | ||
// // Yield when we have no messages so we can check if the server is still running | ||
// boost::this_fiber::yield(); | ||
// } | ||
// else if (queue_status == boost::fibers::channel_op_status::empty) | ||
// { | ||
// // if the queue is empty, maybe it's because our server is not running | ||
// server_running = m_server->is_running(); | ||
|
||
// if (server_running) | ||
// { | ||
// // Sleep when there are no messages | ||
// std::this_thread::sleep_for(m_sleep_time); | ||
// } | ||
// } | ||
// else if (queue_status == boost::fibers::channel_op_status::closed) | ||
// { | ||
// queue_closed = true; | ||
// } | ||
// else | ||
// { | ||
// std::string error_msg{"Unknown queue status: " + std::to_string(static_cast<int>(queue_status))}; | ||
// LOG(ERROR) << error_msg; | ||
// throw std::runtime_error(error_msg); | ||
// } | ||
// } | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call this out in the details as a part of the PR
# from morpheus.common import FiberQueue | ||
# from morpheus.common import HttpServer | ||
|
||
# with (FiberQueue(self._max_queue_size) as self._queue, | ||
# HttpServer(parse_fn=self._parse_payload, | ||
# bind_address=self._bind_address, | ||
# port=self._port, | ||
# endpoint=self._endpoint, | ||
# method=self._method.value, | ||
# num_threads=self._num_server_threads, | ||
# max_payload_size=self._max_payload_size_bytes, | ||
# request_timeout=self._request_timeout_secs) as http_server): | ||
|
||
# import asyncio | ||
# q = asyncio.Queue(maxsize=self._max_queue_size) | ||
|
||
# q. | ||
|
||
# self._processing = True | ||
# while self._processing: | ||
# # Read as many messages as we can from the queue if it's empty check to see if we should be shutting | ||
# # down. It is important that any messages we received that are in the queue are processed before we | ||
# # shutdown since we already returned an OK response to the client. | ||
# df = None | ||
# try: | ||
# df = self._queue.get(block=False) | ||
# except queue.Empty: | ||
# if (not http_server.is_running()): | ||
# self._processing = False | ||
# else: | ||
# logger.debug("Queue empty, sleeping ...") | ||
# time.sleep(self._sleep_time) | ||
# except Closed: | ||
# logger.error("Queue closed unexpectedly, shutting down") | ||
# self._processing = False | ||
|
||
# if df is not None: | ||
# num_records = len(df) | ||
# yield MessageMeta(df) | ||
# self._records_emitted += num_records | ||
|
||
# if self._stop_after > 0 and self._records_emitted >= self._stop_after: | ||
# self._processing = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
num_records = len(df) | ||
yield MessageMeta(df) | ||
|
||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I don't like break
statements I would instead do:
done = False
while not done:
....
except Closed:
done = True
with (FiberQueue(self._max_queue_size) as self._queue, | ||
HttpServer(parse_fn=self._parse_payload, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really wish I had left a comment here, but I have a vague recollection having difficulty ensuring that the HttpServer
was properly shutdown if and only if the queue was emptied. Which is why I went with lazily constructing these inside a context manager.
With that said, at the time there was a memory leak in Morpheus which prevented stages from being destroyed, which has since been fixed. So I'm not 100% this was needed or I was unwittingly just working-around the memory leak.
|
||
|
||
def retry_async(retry_exceptions=None): | ||
import tenacity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to already be in our env as a transitive dep, but we should add it to dependencies.yaml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look into moving similar code from sherlock.
Description
This PR refactors the C++ HTTP server to allow it to correctly stop processing when a user presses Ctrl+C or
pipeline.stop()
is called.By Submitting this PR I confirm: