Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] schema_registry: Add metrics #24269

Open
wants to merge 3 commits into
base: v24.1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/pandaproxy/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void probe::setup_metrics() {
return _request_metrics.hist().internal_histogram_logform();
})},
{},
{sm::shard_label, operation_label});
{sm::shard_label});
}

void probe::setup_public_metrics() {
Expand Down
138 changes: 130 additions & 8 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,23 @@

#pragma once

#include "config/configuration.h"
#include "container/fragmented_vector.h"
#include "metrics/metrics.h"
#include "metrics/prometheus_sanitize.h"
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/types.h"

#include <seastar/core/metrics.hh>

#include <absl/algorithm/container.h>
#include <absl/container/btree_map.h>
#include <absl/container/btree_set.h>
#include <absl/container/node_hash_map.h>

#include <optional>
#include <ranges>

namespace pandaproxy::schema_registry {

///\brief A mapping of version and schema id for a subject.
Expand Down Expand Up @@ -58,10 +66,13 @@ class store {
public:
using schema_id_set = absl::btree_set<schema_id>;

explicit store() = default;
explicit store()
: store(is_mutable::no) {}

explicit store(is_mutable mut)
: _mutable(mut) {}
: _mutable(mut) {
setup_metrics();
}

struct insert_result {
schema_version version;
Expand Down Expand Up @@ -543,7 +554,7 @@ class store {
result<bool>
set_mode(seq_marker marker, const subject& sub, mode m, force f) {
BOOST_OUTCOME_TRYX(check_mode_mutability(f));
auto& sub_entry = _subjects[sub];
auto& sub_entry = get_or_create_subject_entry(sub);
sub_entry.written_at.push_back(marker);
return std::exchange(sub_entry.mode, m) != m;
}
Expand Down Expand Up @@ -589,7 +600,7 @@ class store {
seq_marker marker,
const subject& sub,
compatibility_level compatibility) {
auto& sub_entry = _subjects[sub];
auto& sub_entry = get_or_create_subject_entry(sub);
sub_entry.written_at.push_back(marker);
return std::exchange(sub_entry.compatibility, compatibility)
!= compatibility;
Expand Down Expand Up @@ -637,7 +648,7 @@ class store {
bool inserted;
};
insert_subject_result insert_subject(subject sub, schema_id id) {
auto& subject_entry = _subjects[std::move(sub)];
auto& subject_entry = get_or_create_subject_entry(std::move(sub));
subject_entry.deleted = is_deleted::no;
auto& versions = subject_entry.versions;
const auto v_it = std::find_if(
Expand All @@ -661,7 +672,7 @@ class store {
schema_version version,
schema_id id,
is_deleted deleted) {
auto& subject_entry = _subjects[std::move(sub)];
auto& subject_entry = get_or_create_subject_entry(std::move(sub));
auto& versions = subject_entry.versions;
subject_entry.written_at.push_back(marker);

Expand Down Expand Up @@ -705,6 +716,67 @@ class store {
return outcome::success();
}

void setup_metrics() {
namespace sm = ss::metrics;
const auto make_schema_count = [this]() {
return sm::make_gauge(
"schema_count",
[this] { return _schemas.size(); },
sm::description("The number of schemas in the store"));
};
const auto make_subject_count = [this](is_deleted deleted) {
return sm::make_gauge(
"subject_count",
[this, deleted] {
return std::ranges::count_if(
_subjects, [deleted](const auto& entry) {
return entry.second.deleted == deleted;
});
},
sm::description("The number of subjects in the store"),
{sm::label{"deleted"}(deleted)});
};
const auto make_schema_bytes = [this]() {
return sm::make_gauge(
"schema_memory_bytes",
[this] {
return absl::c_accumulate(
_schemas | std::views::transform([](const auto& s) {
return s.second.definition.raw()().size_bytes();
}),
size_t{0});
},
sm::description("The memory usage of schemas in the store"));
};
auto group_name = prometheus_sanitize::metrics_name(
"schema_registry_cache");
const std::vector<sm::label> agg{{sm::shard_label}};

if (!config::shard_local_cfg().disable_metrics()) {
_metrics.add_group(
group_name,
{
make_schema_count(),
make_schema_bytes(),
make_subject_count(is_deleted::no),
make_subject_count(is_deleted::yes),
},
{},
agg);
}

if (!config::shard_local_cfg().disable_public_metrics()) {
_public_metrics.add_group(
group_name,
{
make_schema_count().aggregate(agg),
make_schema_bytes().aggregate(agg),
make_subject_count(is_deleted::no).aggregate(agg),
make_subject_count(is_deleted::yes).aggregate(agg),
});
}
};

private:
struct schema_entry {
explicit schema_entry(canonical_schema_definition definition)
Expand All @@ -713,17 +785,65 @@ class store {
canonical_schema_definition definition;
};

struct subject_entry {
class subject_entry {
public:
explicit subject_entry(const subject& sub) { setup_metrics(sub); }
std::optional<compatibility_level> compatibility;
std::optional<mode> mode;
std::vector<subject_version_entry> versions;
is_deleted deleted{false};

std::vector<seq_marker> written_at;

private:
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;

void setup_metrics(const subject& sub) {
namespace sm = ss::metrics;
auto group_name = prometheus_sanitize::metrics_name(
"schema_registry_cache");
const auto make_subject_version_count = [this,
&sub](is_deleted deleted) {
return sm::make_gauge(
"subject_version_count",
[this, deleted] {
return std::ranges::count_if(
versions, [deleted](const subject_version_entry& v) {
return v.deleted == deleted;
});
},
sm::description("The number of versions in the subject"),
{
sm::label{"subject"}(sub),
sm::label{"deleted"}(deleted),
});
};
if (!config::shard_local_cfg().disable_metrics()) {
_metrics.add_group(
group_name,
{make_subject_version_count(is_deleted::no),
make_subject_version_count(is_deleted::yes)},
{},
{sm::shard_label});
}
if (!config::shard_local_cfg().disable_public_metrics()) {
_public_metrics.add_group(
group_name,
{make_subject_version_count(is_deleted::no)
.aggregate({sm::shard_label}),
make_subject_version_count(is_deleted::yes)
.aggregate({sm::shard_label})});
}
}
};
using schema_map = absl::btree_map<schema_id, schema_entry>;
using subject_map = absl::node_hash_map<subject, subject_entry>;

subject_entry& get_or_create_subject_entry(subject sub) {
return _subjects.try_emplace(sub, sub).first->second;
}

result<subject_map::iterator>
get_subject_iter(const subject& sub, include_deleted inc_del) {
const store* const_this = this;
Expand Down Expand Up @@ -781,7 +901,9 @@ class store {
subject_map _subjects;
compatibility_level _compatibility{compatibility_level::backward};
mode _mode{mode::read_write};
is_mutable _mutable{is_mutable::no};
is_mutable _mutable;
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;
};

} // namespace pandaproxy::schema_registry
Loading