From e01fcafb8bc3056b1670feb3a62c2d245874c630 Mon Sep 17 00:00:00 2001 From: Keming Date: Mon, 11 Oct 2021 10:15:30 +0800 Subject: [PATCH] add python metric example (#73) * add python side metric example * add doc * fix lint * move pid to init func * update metrics doc * Apply suggestions from code review Co-authored-by: zclzc <38581401+lkevinzc@users.noreply.github.com> * update metrics doc, rm confusing comments Co-authored-by: zclzc <38581401+lkevinzc@users.noreply.github.com> --- docs/example/metric.md | 36 ++++++++++++++ examples/python_side_metrics.py | 84 +++++++++++++++++++++++++++++++++ mkdocs.yml | 1 + 3 files changed, 121 insertions(+) create mode 100644 docs/example/metric.md create mode 100644 examples/python_side_metrics.py diff --git a/docs/example/metric.md b/docs/example/metric.md new file mode 100644 index 00000000..6788e0ed --- /dev/null +++ b/docs/example/metric.md @@ -0,0 +1,36 @@ +This is an example demonstrating how to add your customized Python side Prometheus metrics. + +Mosec already has the Rust side metrics, including: + +* throughput for the inference endpoint +* duration for each stage (including the IPC time) +* batch size (only for the `max_batch_size > 1` workers) +* number of remaining tasks to be processed + +If you need to monitor more details about the inference process, you can add some Python side metrics. E.g., the inference result distribution, the duration of some CPU-bound or GPU-bound processing, the IPC time (get from `rust_step_duration - python_step_duration`). + +This example has a simple WSGI app as the monitoring metrics service. In each worker process, the `Counter` will collect the inference results and export them to the metrics service. For the inference part, it parses the batch data and compares them with the average value. + +For more information about the multiprocess mode for the metrics, check the [Prometheus doc](https://github.com/prometheus/client_python#multiprocess-mode-eg-gunicorn). + +#### **`python_side_metrics.py`** + +```python +--8<-- "examples/python_side_metrics.py" +``` + +#### Start + + python python_side_metrics.py + +#### Test + + http POST :8000/inference num=1 + +#### Check the Python side metrics + + http :8080 + +#### Check the Rust side metrics + + http :8000/metrics diff --git a/examples/python_side_metrics.py b/examples/python_side_metrics.py new file mode 100644 index 00000000..0c962052 --- /dev/null +++ b/examples/python_side_metrics.py @@ -0,0 +1,84 @@ +import logging +import os +import pathlib +import tempfile +import threading +from typing import List +from wsgiref.simple_server import make_server + +from mosec import Server, Worker +from mosec.errors import ValidationError + +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) +formatter = logging.Formatter( + "%(asctime)s - %(process)d - %(levelname)s - %(filename)s:%(lineno)s - %(message)s" +) +sh = logging.StreamHandler() +sh.setFormatter(formatter) +logger.addHandler(sh) + + +# check the PROMETHEUS_MULTIPROC_DIR environment variable before import Prometheus +if not os.environ.get("PROMETHEUS_MULTIPROC_DIR"): + metric_dir_path = os.path.join(tempfile.gettempdir(), "prometheus_multiproc_dir") + pathlib.Path(metric_dir_path).mkdir(parents=True, exist_ok=True) + os.environ["PROMETHEUS_MULTIPROC_DIR"] = metric_dir_path + +from prometheus_client import ( # type: ignore # noqa: E402 + CONTENT_TYPE_LATEST, + CollectorRegistry, + Counter, + generate_latest, + multiprocess, +) + +metric_registry = CollectorRegistry() +multiprocess.MultiProcessCollector(metric_registry) +counter = Counter("inference_result", "statistic of result", ("status", "pid")) + + +def metric_app(environ, start_response): + data = generate_latest(metric_registry) + start_response( + "200 OK", + [("Content-Type", CONTENT_TYPE_LATEST), ("Content-Length", str(len(data)))], + ) + return iter([data]) + + +def metric_service(host="", port=8080): + with make_server(host, port, metric_app) as httpd: + httpd.serve_forever() + + +class Inference(Worker): + def __init__(self): + super().__init__() + self.pid = str(os.getpid()) + + def deserialize(self, data: bytes) -> int: + json_data = super().deserialize(data) + try: + res = int(json_data.get("num")) + except Exception as err: + raise ValidationError(err) + return res + + def forward(self, data: List[int]) -> List[bool]: + avg = sum(data) / len(data) + ans = [x >= avg for x in data] + counter.labels(status="true", pid=self.pid).inc(sum(ans)) + counter.labels(status="false", pid=self.pid).inc(len(ans) - sum(ans)) + return ans + + +if __name__ == "__main__": + # Run the metrics server in another thread. + metric_thread = threading.Thread(target=metric_service, daemon=True) + metric_thread.start() + + # Run the inference server + server = Server() + server.append_worker(Inference, max_batch_size=8) + server.run() diff --git a/mkdocs.yml b/mkdocs.yml index d65e3cdb..e11c7206 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -17,6 +17,7 @@ nav: - Overview: "example/index.md" - Echo: "example/echo.md" - PyTorch: "example/pytorch.md" + - Metrics: "example/metric.md" # - Deploy: "deploy.md" # - FAQ: "faq.md" # - Benchmark: "BENCHMARK.md"