Skip to content

Commit

Permalink
fix monotonic counter implementations (#90)
Browse files Browse the repository at this point in the history
This change fixes a couple of assumptions that were made in #87 and #88. These
changes were made to address a usage issue for monotonic counters in the
spectator-go thin client library. For that library, a common use case is to
sample uint64 monotonic data sources, so the data type for that meter type in
the library was set to uint64. This lead to a need to cascade that change into
spectatord.

However, changing the existing data type for `C` from `double` to `uint64_t`
caused the monotonic counter usage in atlas-system-agent to start partially
failing on parsing numbers. When we made these changes, we overlooked the
fact that the `double` data type is mostly used when monotonic data sources
need to be converted into base units for recording values. So, a monotonic
data source recording microsecond values would have to be divided by 1000,
and this results in a fractional number, which is not parsed by `strtoull`.

Example error message:

```
While parsing sys.pressure.some,xatlas.process=atlas-system-agent,id=io:117.094624:
Got 117 parsing value, ignoring chars starting at .094624
```

The correct way to support all of these use cases, and to preserve backwards
compatibility, is to introduce a new meter type `MonotonicCounterUint` which
supports the `uint64_t` data type and handles rollovers, while reverting the
changes to `MonotonicCounters` (`C`) and `MonotonicSampled` (`X`). In reverting
the changes, we kept the original implemenation of disallowing negative value
updates, because there is not a use case where that should happen.

We chose not to create a `MonotonicSampledUint` at this point in time, because
this is an experimental meter type and there is not currently any demand for it.

This strategy mirrors the spectator-java implementation:

* https://www.javadoc.io/static/com.netflix.spectator/spectator-api/1.7.13/com/netflix/spectator/api/patterns/PolledMeter.Builder.html#monitorMonotonicCounter-T-java.util.function.ToLongFunction-
* https://www.javadoc.io/static/com.netflix.spectator/spectator-api/1.7.13/com/netflix/spectator/api/patterns/PolledMeter.Builder.html#monitorMonotonicCounterDouble-T-java.util.function.ToDoubleFunction-

This change updates the admin server `/metrics` endpoint, so that the new meter
type will be reflected in the output. A `stats` block is also added to the
payload to help make it easier to get a sense of the number of meters that are
in use.
  • Loading branch information
copperlight authored Jun 14, 2024
1 parent 234a309 commit 4e0f274
Show file tree
Hide file tree
Showing 17 changed files with 534 additions and 263 deletions.
40 changes: 23 additions & 17 deletions README.md

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions admin/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ void GET_metrics(HTTPServerRequest& req, HTTPServerResponse& res, spectator::Reg
}
obj->set("mono_counters", mono_counters);

Poco::JSON::Array::Ptr mono_counters_uint = new Poco::JSON::Array(true);
for (auto it : registry.MonotonicCountersUint()) {
auto meter = fmt_meter_object((spectator::Meter*)it);
meter->set("value", fmt::format("{}", it->Delta()));
mono_counters_uint->add(meter);
}
obj->set("mono_counters_uint", mono_counters_uint);

Poco::JSON::Array::Ptr timers = new Poco::JSON::Array(true);
for (auto it : registry.Timers()) {
auto meter = fmt_meter_object((spectator::Meter*)it);
Expand All @@ -215,6 +223,28 @@ void GET_metrics(HTTPServerRequest& req, HTTPServerResponse& res, spectator::Reg
}
obj->set("timers", timers);

Object::Ptr stats = new Object(true);
auto age_gauges_size = registry.AgeGauges().size();
auto counters_size = registry.Counters().size();
auto dist_summaries_size = registry.DistSummaries().size();
auto gauges_size = registry.Gauges().size();
auto max_gauges_size = registry.MaxGauges().size();
auto mono_counters_size = registry.MonotonicCounters().size();
auto mono_counters_uint_size = registry.MonotonicCountersUint().size();
auto timers_size = registry.Timers().size();
auto total = age_gauges_size + counters_size + dist_summaries_size + gauges_size +
max_gauges_size + mono_counters_size + mono_counters_uint_size + timers_size;
stats->set("age_gauges.size", age_gauges_size);
stats->set("counters.size", counters_size);
stats->set("dist_summaries.size", dist_summaries_size);
stats->set("gauges.size", gauges_size);
stats->set("max_gauges.size", max_gauges_size);
stats->set("mono_counters.size", mono_counters_size);
stats->set("mono_counters_uint.size", mono_counters_uint_size);
stats->set("timers.size", timers_size);
stats->set("total.size", total);
obj->set("stats", stats);

res.setStatus(HTTPResponse::HTTP_OK);
res.setContentType("application/json");
obj->stringify(res.send());
Expand Down
15 changes: 8 additions & 7 deletions admin/admin_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,17 @@ TEST_F(AdminServerTest, GET_metrics) {
Var result {parser.parse(rr)};
Object::Ptr object = result.extract<Object::Ptr>();

int count = 0;
std::vector<std::string> expected_keys {"age_gauges", "counters", "dist_summaries",
"gauges", "max_gauges", "mono_counters", "timers"};
std::vector<std::string> expected_keys {"age_gauges", "counters", "dist_summaries", "gauges",
"max_gauges", "mono_counters", "mono_counters_uint",
"stats", "timers"};
std::vector<std::string> found_keys;

for (auto &it : *object) {
if (contains(expected_keys, it.first)) {
count += 1;
}
found_keys.emplace_back(it.first);
}
std::sort(found_keys.begin(), found_keys.end());

EXPECT_EQ(count, expected_keys.size());
EXPECT_EQ(found_keys, expected_keys);
}

TEST_F(AdminServerTest, GET_undefined_route) {
Expand Down
106 changes: 54 additions & 52 deletions server/spectatord.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,27 @@ static void update_statsd_metric(spectator::Registry* registry,
}
}

// parse a single line of the form:
// <METRIC_NAME>:<VALUE>|<TYPE>|@<SAMPLE_RATE>|#<TAG_KEY_1>:<TAG_VALUE_1>,<TAG_2>
// See:
// https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics
// examples:
// custom_metric:60|g|#shell - Set the custom_metric gauge to 60 and tag it
// with 'shell' page.views:1|c - Increment the page.views COUNT metric.
// fuel.level:0.5|g - Record the fuel tank is half-empty.
// song.length:240|h|@0.5 - Sample the song.length histogram half of the time.
// users.uniques:1234|s - Track unique visitors to the site.
// users.online:1|c|#country:china - Increment the active users COUNT metric
// and tag by country of origin. users.online:1|c|@0.5|#country:china - Track
// active China users and use a sample rate.
/* StatsD Protocol Parser Specification
*
* Parse a single line, of the following form:
*
* <METRIC_NAME>:<VALUE>|<TYPE>|@<SAMPLE_RATE>|#<TAG_KEY_1>:<TAG_VALUE_1>,<TAG_2>
*
* Reference Link:
*
* https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics
*
* Examples:
*
* custom_metric:60|g|#shell - Set the custom_metric gauge to 60 and tag it
* with 'shell' page.views:1|c - Increment the page.views COUNT metric.
* fuel.level:0.5|g - Record the fuel tank is half-empty.
* song.length:240|h|@0.5 - Sample the song.length histogram half of the time.
* users.uniques:1234|s - Track unique visitors to the site.
* users.online:1|c|#country:china - Increment the active users COUNT metric and tag by
* country of origin.
* users.online:1|c|@0.5|#country:china - Track active China users and use a sample rate.
*/
auto Server::parse_statsd_line(const char* buffer)
-> std::optional<std::string> {
assert(buffer != nullptr);
Expand Down Expand Up @@ -151,7 +159,6 @@ auto Server::parse_statsd_line(const char* buffer)
char char_type = *p;
switch (char_type) {
case 'c':
// counter
type = StatsdMetricType::Counter;
break;
case 'g':
Expand Down Expand Up @@ -254,7 +261,7 @@ auto get_measurement(char type, std::string_view measurement_str, std::string* e
auto value_str = measurement_str.begin() + pos;
char* last_char = nullptr;
valueT value{};
if (type == 'C' || type == 'X') {
if (type == 'U') {
value.u = std::strtoull(value_str, &last_char, 10);
} else {
value.d = std::strtod(value_str, &last_char);
Expand All @@ -265,7 +272,7 @@ auto get_measurement(char type, std::string_view measurement_str, std::string* e
return {};
}
if (*last_char != '\0' && std::isspace(*last_char) == 0) {
if (type == 'C' || type == 'X') {
if (type == 'U') {
*err_msg = fmt::format("Got {} parsing value, ignoring chars starting at {}",
value.u, last_char);
} else {
Expand Down Expand Up @@ -307,8 +314,7 @@ Server::Server(int port_number, std::optional<int> statsd_port_number,
static void prepare_socket_path(const std::string& socket_path) {
::unlink(socket_path.c_str());

// TODO: Migrate to std::filesystem after we verify it works on all
// our platforms
// TODO: Migrate to std::filesystem after we verify it works on all our platforms
auto last = socket_path.rfind('/');
if (last != std::string::npos) {
auto dir = socket_path.substr(0, last);
Expand All @@ -317,8 +323,7 @@ static void prepare_socket_path(const std::string& socket_path) {
::mkdir(dir.c_str(), 0777);
}

// We don't want to restrict the permissions on the socket path
// so any user can send metrics
// We don't want to restrict the permissions on the socket path so any user can send metrics
umask(0);
}

Expand Down Expand Up @@ -530,57 +535,54 @@ auto Server::parse_line(const char* buffer) -> std::optional<std::string> {
Logger()->info("While parsing {}: {}", p, err_msg);
}
switch (type) {
case 't':
// timer, elapsed time is reported in seconds
{
auto nanos = static_cast<int64_t>(measurement->value.d * 1e9);
registry_->GetTimer(measurement->id)
->Record(std::chrono::nanoseconds(nanos));
case 'A':
if (measurement->value.d == 0) {
registry_->GetAgeGauge(measurement->id)->UpdateLastSuccess();
} else {
registry_->GetAgeGauge(measurement->id)->UpdateLastSuccess(
static_cast<int64_t>(measurement->value.d * 1e9));
}
break;
case 'c':
// counter
registry_->GetCounter(measurement->id)->Add(measurement->value.d);
break;
case 'C':
// monotonic counters
registry_->GetMonotonicCounter(measurement->id)->Set(measurement->value.u);
registry_->GetMonotonicCounter(measurement->id)->Set(measurement->value.d);
break;
case 'd':
registry_->GetDistributionSummary(measurement->id)->Record(measurement->value.d);
break;
case 'D':
perc_ds_.get_or_create(registry_, measurement->id)->Record(
static_cast<int64_t>(measurement->value.d));
break;
case 'g':
// gauge
if (extra > 0) {
registry_->GetGauge(measurement->id, absl::Seconds(extra))
->Set(measurement->value.d);
registry_->GetGauge(measurement->id, absl::Seconds(extra))->Set(measurement->value.d);
} else {
// this preserves the previous Ttl, otherwise we would override it
// with the default value if we use the previous constructor
// this preserves the previous ttl, otherwise we would override it
// with the default value, if we use the previous constructor
registry_->GetGauge(measurement->id)->Set(measurement->value.d);
}
break;
case 'm':
registry_->GetMaxGauge(measurement->id)->Update(measurement->value.d);
break;
case 'A':
if (measurement->value.d == 0) {
registry_->GetAgeGauge(measurement->id)->UpdateLastSuccess();
} else {
registry_->GetAgeGauge(measurement->id)
->UpdateLastSuccess(static_cast<int64_t>(measurement->value.d * 1e9));
case 't': // elapsed time is reported in seconds
{
auto nanos = static_cast<int64_t>(measurement->value.d * 1e9);
registry_->GetTimer(measurement->id)->Record(std::chrono::nanoseconds(nanos));
}
break;
case 'd':
// dist summary
registry_->GetDistributionSummary(measurement->id)
->Record(measurement->value.d);
case 'T':
{
auto nanos = static_cast<int64_t>(measurement->value.d * 1e9);
perc_timers_.get_or_create(registry_, measurement->id)->Record(
std::chrono::nanoseconds(nanos));
}
break;
case 'T': {
auto nanos = static_cast<int64_t>(measurement->value.d * 1e9);
perc_timers_.get_or_create(registry_, measurement->id)
->Record(std::chrono::nanoseconds(nanos));
} break;
case 'D':
perc_ds_.get_or_create(registry_, measurement->id)
->Record(static_cast<int64_t>(measurement->value.d));
case 'U':
registry_->GetMonotonicCounterUint(measurement->id)->Set(measurement->value.u);
break;
case 'X':
if (extra > 0) {
Expand Down
Loading

0 comments on commit 4e0f274

Please sign in to comment.