Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow metrics registry to be passed through PrometheusMiddleware #176

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from

Conversation

nickdichev-firework
Copy link

Registers the prometheus metrics with a CollectorRegistry as specified by the prometheus-client docs: https://github.com/prometheus/client_python/blob/master/README.md#multiprocess-mode-eg-gunicorn

I also notice that the documentation explicitly says to not overwrite PROMETHEUS_MULTIPROC_DIR which is being done by the middleware. It seems to be working for me locally, I wonder if it could be a problem?

Copy link
Member

@s3rius s3rius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nickdichev-firework, thanks for request. And sorry for late reply. I guess we need to add an option to use custom registries.

The idea is to add registry as a parameter to constructor of Prometheus middleware. Because if I would want to add more metrics to be shown by workers, I would have to spin another prometheus server on a different port. Which looks not exactly elegant.

Can you please add this parameter?

@nickdichev-firework
Copy link
Author

@s3rius Yes that makes since. In fact, your comment made me realize my application does have some metrics in another registry which don't make sense to collect/expose from the primary application. I now need to collect/expose them from the workers.

I tried to make the change, however, I quickly ran into the same issue of getting the Duplicated timeseries in CollectorRegistry error when the worker application is starting up. I tried all sorts of things like but couldn't quite figure out how to get it to work.

I tried to register the "application" registry with prometheus_client.multiprocessing in the application, inside the middleware, and I even tried with a custom http server that registers it in each request like the prometheus client docs recommend. I could never get it to work quite right -- either the "duplicated timeseries" error or getting duplicate metrics in the output.

Also, this solution relies on the fact that taskiq uses multiprocessing to run multiple workers by default. However, it possible that a caller of the library will be running with --workers 1 cli flag. The multiprocessing limits the flexibility of the prometheus client as specified in their docs. What should be done in this case?

@s3rius
Copy link
Member

s3rius commented Jul 18, 2023

Can you please show me which metrics does your application export along with taskiq's metrics?

@s3rius
Copy link
Member

s3rius commented Jul 18, 2023

Because I can run metrics without any issue with custom counter metric.

import asyncio
from taskiq import PrometheusMiddleware
from prometheus_client import Counter
from taskiq_redis import ListQueueBroker

my_custom_counter = Counter("my_custom_ccc", "Made to test #176")

broker = ListQueueBroker("redis://localhost").with_middlewares(PrometheusMiddleware())


@broker.task()
async def mytask(val: int):
    my_custom_counter.inc(val)


async def main():
    await broker.startup()

    for i in range(10):
        await mytask.kiq(i)

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
taskiq worker br:broker -w 4
python br.py

This code runs fine, even with multiple workers.

@nickdichev-firework
Copy link
Author

nickdichev-firework commented Jul 18, 2023

@s3rius Interesting, I was able to do some more debugging with your example. I was able to run your example just fine. What I noticed however, is that it did happen when I ran taskiq worker with --fs-discover flag.

I think what is happening is my tasks.py file was importing the broker and this caused some sort of circular dependency registering the metrics? What I tried was to avoid creating the PrometheusMiddleware in the global scope. I instead added it in the startup callback:

@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState) -> None:
    sentry.init_sentry()

    # broker.add_middlewares([PrometheusMiddleware()])
    prom_middleware = PrometheusMiddleware()
    prom_middleware.set_broker(broker)
    broker.middlewares.append(prom_middleware)

This worked! As you can see, I tried adding it with a call to add_middlewares but I am always getting this error. I'm not sure why this is the case since in the middleware's __init__ I do the same if check and it will return True.. will need to look into it more.

In any case, I am happy to update my PR to allow a registry to pass through __init__. The only problem I am having is what to do about the imports. I see that the imports are checked in __init__, however, I need a couple in the global namespace to get a proper type and default parameter for the metrics registry.

I have pushed my change and left a comment in the part I'm not so sure about. Thanks for the help!

@@ -10,6 +10,18 @@

logger = getLogger("taskiq.prometheus")

try:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to pull all this out from __init__?

@nickdichev-firework nickdichev-firework changed the title Register prometheus metrics with multiprocess collector Allow metrics registry to be passed through PrometheusMiddleware Jul 19, 2023
@s3rius
Copy link
Member

s3rius commented Jul 19, 2023

I have another question. Do you use another brokers? Maybe you start multiple brokers and they all try to add same metrics?

@nickdichev-firework
Copy link
Author

nickdichev-firework commented Jul 19, 2023

No I only have one broker.

# src.nebula.broker

from uuid import uuid4
import os

from taskiq import InMemoryBroker, TaskiqEvents, TaskiqState, PrometheusMiddleware
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

from nebula.config import REDIS_URL
from nebula.utils import sentry
from nebula.utils.taskiq_middleware import SentryException

three_days_in_seconds = 3 * 24 * 60 * 60
redis_async_result = RedisAsyncResultBackend(redis_url=REDIS_URL, result_ex_time=three_days_in_seconds)


def task_id_generator() -> str:
    # TODO: interpolate with function call to taskiq default id generator
    # (the implementations are currently the same)
    # https://github.com/taskiq-python/taskiq/blob/cd4104fdfc4353aa6d9e0faafc4bae4a7afa3c09/taskiq/abc/broker.py#L50
    return f"nebula_tasks:{uuid4().hex}"


broker = (
    ListQueueBroker(url=REDIS_URL, queue_name="nebula_tasks")
    .with_result_backend(redis_async_result)
    .with_id_generator(task_id_generator)
    .with_middlewares(SentryException())
)


env = os.environ.get("NEBULA_ENVIRONMENT")
if env and env == "test":
    broker = InMemoryBroker()


@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState) -> None:
    sentry.init_sentry()

    # broker.add_middlewares([PrometheusMiddleware()])
    prom_middleware = PrometheusMiddleware()
    prom_middleware.set_broker(broker)
    broker.middlewares.append(prom_middleware)
# src.nebula.core.tasks

from nebula.broker import broker
from nebula.core import lics


@broker.task
async def process_live_stream(replay_ready):
    return await lics.process_live_stream(replay_ready)


@broker.task
async def answer_question(suggest_auto_response):
    return await lics.answer_question(suggest_auto_response)

and I am running the broker with pipenv run taskiq worker --fs-discover --workers 2 src.nebula.broker:broker

@s3rius
Copy link
Member

s3rius commented Aug 9, 2023

@nickdichev-firework, sorry, I was on vacation. So, I ran your example without an issue. I just removed Sentry middleware.

import os
from uuid import uuid4

from prometheus_client import Gauge
from taskiq import InMemoryBroker, PrometheusMiddleware
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

my_gauge = Gauge("a", "test")

three_days_in_seconds = 3 * 24 * 60 * 60
redis_async_result = RedisAsyncResultBackend(
    redis_url="redis://localhost",
    result_ex_time=three_days_in_seconds,
)


def task_id_generator() -> str:
    # TODO: interpolate with function call to taskiq default id generator
    # (the implementations are currently the same)
    # https://github.com/taskiq-python/taskiq/blob/cd4104fdfc4353aa6d9e0faafc4bae4a7afa3c09/taskiq/abc/broker.py#L50
    return f"nebula_tasks:{uuid4().hex}"


broker = (
    ListQueueBroker(url="redis://localhost", queue_name="nebula_tasks")
    .with_result_backend(redis_async_result)
    .with_id_generator(task_id_generator)
    .with_middlewares(PrometheusMiddleware())
)


env = os.environ.get("NEBULA_ENVIRONMENT")
if env and env == "test":
    broker = InMemoryBroker()

I modified it a little bit, but it does the same thing. Can you please try to create a new project and run this file with taskiq worker?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants