Skip to content

Commit

Permalink
Reset metric name lookup table when another worker resets the metric
Browse files Browse the repository at this point in the history
The metric key index now accepts a function that gets called every time
a key is deleted from the index, which happens at the next sync after
another worker had deleted a key. The callback function finds the same
metric in the local worker's registry and cleans up its lookup table.

To make sure this happens deterministically for all workers, the key
index sync is now also scheduled to run every `sync_interval` (1s
by default).

The goals are:
- to prevent unbounded growth of the per-metric lookup tables;
- to ensure that previously reset metrics can be used again.
  • Loading branch information
knyar committed Apr 17, 2024
1 parent 2982df8 commit f7c83a1
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 13 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ section of nginx configuration.
metric names on output.
* `error_metric_name` (string): Can be used to change the default name of
error metric (see [Built-in metrics](#built-in-metrics) for details).
* `sync_interval` (number): sets per-worker counter sync interval in seconds.
This sets the boundary on eventual consistency of counter metrics. Defaults
to 1.
* `sync_interval` (number): sets the sync interval for per-worker counters and
key index (in seconds). This sets the boundary on eventual consistency of
counter metric increments, and metric resets/deletions. Defaults to 1.
* `lookup_max_size` (number): maximum size of a per-metric lookup table
maintained by each worker to cache full metric names. Defaults to 1000.
If you have metrics with extremely high cardinality and lots of available
Expand Down
37 changes: 31 additions & 6 deletions prometheus.lua
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ local ERR_MSG_COUNTER_NOT_INITIALIZED = "counter not initialized! " ..
-- Error message that gets logged when the shared dictionary gets full.
local ERR_MSG_LRU_EVICTION = "Shared dictionary used for prometheus metrics " ..
"is full. REPORTED METRIC DATA MIGHT BE INCOMPLETE. Please increase the " ..
"size of the dictionary or decrease metric cardinality."
"size of the dictionary or reduce metric cardinality."

-- Accepted range of byte values for tailing bytes of utf8 strings.
-- This is defined outside of the validate_utf8_string function as a const
Expand Down Expand Up @@ -384,8 +384,7 @@ local function lookup_or_create(self, label_values)
end

if self.lookup_size >= self.lookup_max_size then
self.lookup_size = 0
self.lookup = {}
self:reset_lookup()
end

local t = self.lookup
Expand Down Expand Up @@ -623,6 +622,12 @@ local function observe(self, value, label_values)
end
end

-- Reset the metric name lookup table for a given metric.
local function reset_lookup(self)
self.lookup_size = 0
self.lookup = {}
end

-- Delete all metrics for a given gauge, counter or a histogram.
--
-- This is like `del`, but will delete all time series for all previously
Expand Down Expand Up @@ -691,7 +696,7 @@ local function reset(self)
end

-- Clean up the full metric name lookup table as well.
self.lookup = {}
self:reset_lookup()
end

-- Initialize the module.
Expand Down Expand Up @@ -738,7 +743,22 @@ function Prometheus.init(dict_name, options_or_prefix)
end

self.registry = {}
self.key_index = key_index_lib.new(self.dict, KEY_INDEX_PREFIX)
self.key_index = key_index_lib.new(self.dict, KEY_INDEX_PREFIX, function(metric_key)
-- When another worker calls reset or del on a metric, reset that
-- metric's local lookup table.
local metric_name = ngx_re_gsub(metric_key, "{.*", "", "jo")
if self.registry[metric_name] then
local m = self.registry[metric_name]
m:reset_lookup()
return
end

metric_name = ngx_re_gsub(metric_name, "_sum$", "", "jo")
local m = self.registry[metric_name]
if m and m.typ == TYPE_HISTOGRAM then
m:reset_lookup()
end
end)

self.initialized = true

Expand All @@ -757,7 +777,7 @@ end

-- Initialize the worker counter.
--
-- This can call this function from the `init_worker_by_lua` if you are calling
-- You can call this function from the `init_worker_by_lua` if you are calling
-- Prometheus.init() from `init_by_lua`, but this is deprecated. Instead, just
-- call Prometheus.init() from `init_worker_by_lua_block` and pass sync_interval
-- as part of the `options` argument if you need.
Expand All @@ -782,6 +802,10 @@ function Prometheus:init_worker(sync_interval)
error(err, 2)
end
self._counter = counter_instance

ngx.timer.every(self.sync_interval, function (premature)
self.key_index:sync()
end)
end

-- Register a new metric.
Expand Down Expand Up @@ -844,6 +868,7 @@ local function register(self, name, help, label_names, buckets, typ)
lookup = {},
lookup_size = 0,
lookup_max_size = self.lookup_max_size,
reset_lookup = reset_lookup,
parent = self,
-- Store a reference for logging functions for faster lookup.
_log_error = function(...) self:log_error(...) end,
Expand Down
7 changes: 5 additions & 2 deletions prometheus_keys.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
local KeyIndex = {}
KeyIndex.__index = KeyIndex

function KeyIndex.new(shared_dict, prefix)
function KeyIndex.new(shared_dict, prefix, delete_callback)
local self = setmetatable({}, KeyIndex)
self.dict = shared_dict
self.key_prefix = prefix .. "key_"
Expand All @@ -18,6 +18,7 @@ function KeyIndex.new(shared_dict, prefix)
self.deleted = 0
self.keys = {}
self.index = {}
self.delete_callback = delete_callback
return self
end

Expand Down Expand Up @@ -45,6 +46,7 @@ function KeyIndex:sync_range(first, last)
self.keys[i] = key
self.index[key] = i
elseif self.keys[i] then
self.delete_callback(self.keys[i])
self.index[self.keys[i]] = nil
self.keys[i] = nil
end
Expand Down Expand Up @@ -107,14 +109,15 @@ end
-- Args:
-- key: String value of the key, must exists in this index.
function KeyIndex:remove(key, err_msg_lru_eviction)
self:sync()
local i = self.index[key]
if i then
self.index[key] = nil
self.keys[i] = nil
self.dict:set(self.key_prefix .. i, nil)
self.deleted = self.deleted + 1

-- increment delete_count to signalize other workers that they should do a full sync
-- increment delete_count to signal other workers that they should do a full sync
local _, err, forcible = self.dict:incr(self.delete_count, 1, 0)
if err or forcible then
return err or err_msg_lru_eviction
Expand Down
49 changes: 47 additions & 2 deletions prometheus_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,18 @@ function TestPrometheus:setUp()
self.dict = setmetatable({}, SimpleDict)
ngx.shared.metrics = self.dict
self.p = require('prometheus').init('metrics')
-- Another instance of the library to simulate a second nginx worker.
self.p2 = require('prometheus').init('metrics')
self.counter1 = self.p:counter("metric1", "Metric 1")
self.counter2 = self.p:counter("metric2", "Metric 2", {"f2", "f1"})
self.counter3 = self.p:counter("metric3", "Metric 3", {"f3"})
self.counter4 = self.p:counter("metric4", "Metric 4", {"f1","f2","f3"})
self.gauge1 = self.p:gauge("gauge1", "Gauge 1")
self.gauge1_p2 = self.p2:gauge("gauge1", "Gauge 1")
self.gauge2 = self.p:gauge("gauge2", "Gauge 2", {"f2", "f1"})
self.gauge2_p2 = self.p2:gauge("gauge2", "Gauge 2", {"f2", "f1"})
self.hist1 = self.p:histogram("l1", "Histogram 1")
self.hist1_p2 = self.p2:histogram("l1", "Histogram 1")
self.hist2 = self.p:histogram("l2", "Histogram 2", {"var", "site"})
end
function TestPrometheus.tearDown()
Expand Down Expand Up @@ -411,6 +416,7 @@ function TestPrometheus:testReset()
self.gauge1:reset()
self.p.key_index:sync()
luaunit.assertEquals(self.dict:get("gauge1"), nil)
luaunit.assertEquals(self.gauge1.lookup, {})
luaunit.assertEquals(self.dict:get("nginx_metric_errors_total"), 0)

self.gauge1:inc(3)
Expand Down Expand Up @@ -522,6 +528,43 @@ function TestPrometheus:testReset()
luaunit.assertEquals(self.dict:get('l2_sum{var="ok",site="site1"}'), nil)
luaunit.assertEquals(self.dict:get("nginx_metric_errors_total"), 0)

-- Set a gauge value that will be reset by another worker.
self.gauge1:set(42)
self.gauge2:set(91, {"a", "b1"})
self.gauge2:set(92, {"a", "b2"})
luaunit.assertEquals(self.dict:get("gauge1"), 42)
luaunit.assertEquals(self.dict:get('gauge2{f2="a",f1="b1"}'), 91)
luaunit.assertEquals(self.dict:get('gauge2{f2="a",f1="b2"}'), 92)
luaunit.assertNotEquals(self.gauge1.lookup, {})
luaunit.assertNotEquals(self.gauge2.lookup, {})
luaunit.assertEquals(self.dict:get("nginx_metric_errors_total"), 0)

-- After another worker has reset the metric, confirm that the per-metric
-- lookup table has been reset.
self.gauge1_p2:reset()
self.gauge2_p2:del({"a", "b1"})
self.p.key_index:sync()
luaunit.assertEquals(self.dict:get("gauge1"), nil)
luaunit.assertEquals(self.dict:get('gauge2{f2="a",f1="b1"}'), nil)
luaunit.assertEquals(self.dict:get('gauge2{f2="a",f1="b2"}'), 92)
luaunit.assertEquals(self.gauge1.lookup, {})
luaunit.assertEquals(self.gauge2.lookup, {})
luaunit.assertEquals(self.dict:get("nginx_metric_errors_total"), 0)

-- Similarly, check that a histogram metric reset by another worker
-- results in metric lookup table being reset.
self.hist1:reset()
self.hist1:observe(0.44)
self.p._counter:sync()
luaunit.assertEquals(self.dict:get('l1_sum'), 0.44)
luaunit.assertNotEquals(self.hist1.lookup, {})

self.hist1_p2:reset()
self.p.key_index:sync()
luaunit.assertEquals(self.dict:get('l1_sum'), nil)
luaunit.assertEquals(self.hist1.lookup, {})
luaunit.assertEquals(self.hist1_p2.lookup, {})

-- key not exist
self.gauge2:inc(4, {"key_not_exist", "key_not_exist"})
self.gauge2:reset()
Expand Down Expand Up @@ -705,7 +748,9 @@ TestKeyIndex = {}
function TestKeyIndex:setUp()
self.dict = setmetatable({}, SimpleDict)
ngx.shared.metrics = self.dict
self.key_index = require('prometheus_keys').new(self.dict, '_prefix_')
self.key_index = require('prometheus_keys').new(self.dict, '_prefix_', function(key)
self.last_deleted_key = key
end)
end
function TestKeyIndex.tearDown()
ngx.logs = nil
Expand Down Expand Up @@ -795,13 +840,13 @@ function TestKeyIndex:testSync()
-- key deleted by another worker
self.dict:set("_prefix_delete_count", 1)
self.dict:delete("_prefix_key_2")
self.dict:set("_prefix_key_3", "key3")
self.key_index:sync()
keys = self.key_index:list()
luaunit.assertEquals(ngx.logs, nil)
luaunit.assertEquals(#keys, 2)
luaunit.assertEquals(keys[1], "key1")
luaunit.assertEquals(keys[2], "key3")
luaunit.assertEquals(self.last_deleted_key, "key2")
end

function TestPrometheus:testLookupMaxSize()
Expand Down

0 comments on commit f7c83a1

Please sign in to comment.