-
Notifications
You must be signed in to change notification settings - Fork 279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metrics #3216
Metrics #3216
Changes from all commits
7cfcb5d
35ec9ca
58d4ee7
21202a8
74e9eb7
e0769a7
bfd7a60
7bd852d
de5538f
155ec42
4c238d2
b27bae4
d512e00
8137882
841f4b6
1288d94
33d9416
63fb216
ff937cd
6197c30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
from sqlalchemy import func | ||
from prometheus_client.core import GaugeMetricFamily, InfoMetricFamily | ||
from prometheus_client.registry import Collector | ||
from followthemoney import __version__ as ftm_version | ||
|
||
from aleph import __version__ as aleph_version | ||
from aleph.core import create_app as create_flask_app | ||
from aleph.queues import get_active_dataset_status | ||
from aleph.model import Role, Collection, EntitySet, Bookmark | ||
|
||
|
||
class InfoCollector(Collector): | ||
def collect(self): | ||
yield InfoMetricFamily( | ||
"aleph_system", | ||
"Aleph system information", | ||
value={ | ||
"aleph_version": aleph_version, | ||
"ftm_version": ftm_version, | ||
}, | ||
) | ||
|
||
|
||
class DatabaseCollector(Collector): | ||
def __init__(self): | ||
self._flask_app = create_flask_app() | ||
|
||
def collect(self): | ||
with self._flask_app.app_context(): | ||
yield self._users() | ||
yield self._collections() | ||
yield self._collection_users() | ||
yield self._entitysets() | ||
yield self._entityset_users() | ||
yield self._bookmarks() | ||
yield self._bookmark_users() | ||
|
||
def _users(self): | ||
return GaugeMetricFamily( | ||
"aleph_users", | ||
"Total number of users", | ||
value=Role.all_users().count(), | ||
) | ||
Comment on lines
+38
to
+43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This metric could be extended in the future. In particular, it might be interesting to expose the number of users that have been active within the past 24h, 7d, 30d etc. This requires some additional, non-trivial work because we’d need to track when users signed in the last time, so I decided not to implement it in this PR. |
||
|
||
def _collections(self): | ||
gauge = GaugeMetricFamily( | ||
"aleph_collections", | ||
"Total number of collections by category", | ||
labels=["category"], | ||
) | ||
|
||
query = ( | ||
Collection.all() | ||
.with_entities(Collection.category, func.count()) | ||
.group_by(Collection.category) | ||
) | ||
|
||
for category, count in query: | ||
gauge.add_metric([category], count) | ||
|
||
return gauge | ||
Comment on lines
+45
to
+61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This metric could be extended in the future to expose number of collections by countries. |
||
|
||
def _collection_users(self): | ||
gauge = GaugeMetricFamily( | ||
"aleph_collection_users", | ||
"Total number of users that have created at least one collection", | ||
labels=["category"], | ||
) | ||
|
||
query = ( | ||
Collection.all() | ||
.with_entities( | ||
Collection.category, | ||
func.count(func.distinct(Collection.creator_id)), | ||
) | ||
.group_by(Collection.category) | ||
) | ||
|
||
for category, count in query: | ||
gauge.add_metric([category], count) | ||
|
||
return gauge | ||
|
||
def _entitysets(self): | ||
gauge = GaugeMetricFamily( | ||
"aleph_entitysets", | ||
"Total number of entity set by type", | ||
labels=["type"], | ||
) | ||
|
||
query = ( | ||
EntitySet.all() | ||
.with_entities(EntitySet.type, func.count()) | ||
.group_by(EntitySet.type) | ||
) | ||
|
||
for entityset_type, count in query: | ||
gauge.add_metric([entityset_type], count) | ||
|
||
return gauge | ||
|
||
def _entityset_users(self): | ||
gauge = GaugeMetricFamily( | ||
"aleph_entityset_users", | ||
"Number of users that have created at least on entity set of the given type", | ||
labels=["type"], | ||
) | ||
|
||
query = ( | ||
EntitySet.all() | ||
.with_entities( | ||
EntitySet.type, | ||
func.count(func.distinct(EntitySet.role_id)), | ||
) | ||
.group_by(EntitySet.type) | ||
) | ||
|
||
for entityset_type, count in query: | ||
gauge.add_metric([entityset_type], count) | ||
|
||
return gauge | ||
|
||
def _bookmarks(self): | ||
return GaugeMetricFamily( | ||
"aleph_bookmarks", | ||
"Total number of bookmarks", | ||
value=Bookmark.query.count(), | ||
) | ||
|
||
def _bookmark_users(self): | ||
return GaugeMetricFamily( | ||
"aleph_bookmark_users", | ||
"Number of users that have created at least one bookmark", | ||
value=Bookmark.query.distinct(Bookmark.role_id).count(), | ||
) | ||
|
||
|
||
class QueuesCollector(Collector): | ||
def collect(self): | ||
status = get_active_dataset_status() | ||
|
||
yield GaugeMetricFamily( | ||
"aleph_active_datasets", | ||
"Total number of active datasets", | ||
value=status["total"], | ||
) | ||
|
||
stages = {} | ||
|
||
for collection_status in status["datasets"].values(): | ||
for job_status in collection_status["jobs"]: | ||
for stage_status in job_status["stages"]: | ||
stage = stage_status["stage"] | ||
pending = stage_status["pending"] | ||
running = stage_status["running"] | ||
|
||
if stage not in stages: | ||
stages[stage] = { | ||
"pending": 0, | ||
"running": 0, | ||
} | ||
|
||
stages[stage] = { | ||
"pending": stages[stage].get("pending") + pending, | ||
"running": stages[stage].get("running") + running, | ||
} | ||
|
||
tasks_gauge = GaugeMetricFamily( | ||
"aleph_tasks", | ||
"Total number of pending or running tasks in a given stage", | ||
labels=["stage", "status"], | ||
) | ||
|
||
for stage, tasks in stages.items(): | ||
tasks_gauge.add_metric([stage, "pending"], tasks["pending"]) | ||
tasks_gauge.add_metric([stage, "running"], tasks["running"]) | ||
|
||
yield tasks_gauge |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from prometheus_client import make_wsgi_app, PLATFORM_COLLECTOR | ||
from prometheus_client.core import CollectorRegistry | ||
|
||
from aleph.metrics.collectors import InfoCollector, DatabaseCollector, QueuesCollector | ||
|
||
|
||
def create_app(): | ||
registry = CollectorRegistry() | ||
registry.register(PLATFORM_COLLECTOR) | ||
registry.register(InfoCollector()) | ||
registry.register(DatabaseCollector()) | ||
registry.register(QueuesCollector()) | ||
|
||
return make_wsgi_app(registry=registry) | ||
|
||
|
||
app = create_app() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One additional metric related to xref that would be interesting is how long it takes to process a batch of entities from a single ES scroll response. This could be used to alert us when we are getting close to the scroll timeout.
I haven’t implemented this so far because the individual scroll requests are abstracted away by the
scan
helper from the ES Python client so it would consider a non-trivial amount of work and would increase complexity.