diff --git a/README.md b/README.md index a4e0b16..5423157 100644 --- a/README.md +++ b/README.md @@ -103,9 +103,9 @@ section of nginx configuration. metric names on output. * `error_metric_name` (string): Can be used to change the default name of error metric (see [Built-in metrics](#built-in-metrics) for details). - * `sync_interval` (number): sets per-worker counter sync interval in seconds. - This sets the boundary on eventual consistency of counter metrics. Defaults - to 1. + * `sync_interval` (number): sets the sync interval for per-worker counters and + key index (in seconds). This sets the boundary on eventual consistency of + counter metric increments, and metric resets/deletions. Defaults to 1. * `lookup_max_size` (number): maximum size of a per-metric lookup table maintained by each worker to cache full metric names. Defaults to 1000. If you have metrics with extremely high cardinality and lots of available diff --git a/prometheus.lua b/prometheus.lua index 2ff3aa8..59f39d8 100644 --- a/prometheus.lua +++ b/prometheus.lua @@ -111,7 +111,7 @@ local ERR_MSG_COUNTER_NOT_INITIALIZED = "counter not initialized! " .. -- Error message that gets logged when the shared dictionary gets full. local ERR_MSG_LRU_EVICTION = "Shared dictionary used for prometheus metrics " .. "is full. REPORTED METRIC DATA MIGHT BE INCOMPLETE. Please increase the " .. - "size of the dictionary or decrease metric cardinality." + "size of the dictionary or reduce metric cardinality." -- Accepted range of byte values for tailing bytes of utf8 strings. -- This is defined outside of the validate_utf8_string function as a const @@ -384,8 +384,7 @@ local function lookup_or_create(self, label_values) end if self.lookup_size >= self.lookup_max_size then - self.lookup_size = 0 - self.lookup = {} + self:reset_lookup() end local t = self.lookup @@ -623,6 +622,12 @@ local function observe(self, value, label_values) end end +-- Reset the metric name lookup table for a given metric. +local function reset_lookup(self) + self.lookup_size = 0 + self.lookup = {} +end + -- Delete all metrics for a given gauge, counter or a histogram. -- -- This is like `del`, but will delete all time series for all previously @@ -691,7 +696,7 @@ local function reset(self) end -- Clean up the full metric name lookup table as well. - self.lookup = {} + self:reset_lookup() end -- Initialize the module. @@ -738,7 +743,22 @@ function Prometheus.init(dict_name, options_or_prefix) end self.registry = {} - self.key_index = key_index_lib.new(self.dict, KEY_INDEX_PREFIX) + self.key_index = key_index_lib.new(self.dict, KEY_INDEX_PREFIX, function(metric_key) + -- When another worker calls reset or del on a metric, reset that + -- metric's local lookup table. + local metric_name = ngx_re_gsub(metric_key, "{.*", "", "jo") + if self.registry[metric_name] then + local m = self.registry[metric_name] + m:reset_lookup() + return + end + + metric_name = ngx_re_gsub(metric_name, "_sum$", "", "jo") + local m = self.registry[metric_name] + if m and m.typ == TYPE_HISTOGRAM then + m:reset_lookup() + end + end) self.initialized = true @@ -757,7 +777,7 @@ end -- Initialize the worker counter. -- --- This can call this function from the `init_worker_by_lua` if you are calling +-- You can call this function from the `init_worker_by_lua` if you are calling -- Prometheus.init() from `init_by_lua`, but this is deprecated. Instead, just -- call Prometheus.init() from `init_worker_by_lua_block` and pass sync_interval -- as part of the `options` argument if you need. @@ -782,6 +802,10 @@ function Prometheus:init_worker(sync_interval) error(err, 2) end self._counter = counter_instance + + ngx.timer.every(self.sync_interval, function (premature) + self.key_index:sync() + end) end -- Register a new metric. @@ -844,6 +868,7 @@ local function register(self, name, help, label_names, buckets, typ) lookup = {}, lookup_size = 0, lookup_max_size = self.lookup_max_size, + reset_lookup = reset_lookup, parent = self, -- Store a reference for logging functions for faster lookup. _log_error = function(...) self:log_error(...) end, diff --git a/prometheus_keys.lua b/prometheus_keys.lua index cc763a9..389e8a1 100644 --- a/prometheus_keys.lua +++ b/prometheus_keys.lua @@ -8,7 +8,7 @@ local KeyIndex = {} KeyIndex.__index = KeyIndex -function KeyIndex.new(shared_dict, prefix) +function KeyIndex.new(shared_dict, prefix, delete_callback) local self = setmetatable({}, KeyIndex) self.dict = shared_dict self.key_prefix = prefix .. "key_" @@ -18,6 +18,7 @@ function KeyIndex.new(shared_dict, prefix) self.deleted = 0 self.keys = {} self.index = {} + self.delete_callback = delete_callback return self end @@ -45,6 +46,7 @@ function KeyIndex:sync_range(first, last) self.keys[i] = key self.index[key] = i elseif self.keys[i] then + self.delete_callback(self.keys[i]) self.index[self.keys[i]] = nil self.keys[i] = nil end @@ -107,6 +109,7 @@ end -- Args: -- key: String value of the key, must exists in this index. function KeyIndex:remove(key, err_msg_lru_eviction) + self:sync() local i = self.index[key] if i then self.index[key] = nil @@ -114,7 +117,7 @@ function KeyIndex:remove(key, err_msg_lru_eviction) self.dict:set(self.key_prefix .. i, nil) self.deleted = self.deleted + 1 - -- increment delete_count to signalize other workers that they should do a full sync + -- increment delete_count to signal other workers that they should do a full sync local _, err, forcible = self.dict:incr(self.delete_count, 1, 0) if err or forcible then return err or err_msg_lru_eviction diff --git a/prometheus_test.lua b/prometheus_test.lua index d0b4f8e..ba98b1b 100644 --- a/prometheus_test.lua +++ b/prometheus_test.lua @@ -104,13 +104,18 @@ function TestPrometheus:setUp() self.dict = setmetatable({}, SimpleDict) ngx.shared.metrics = self.dict self.p = require('prometheus').init('metrics') + -- Another instance of the library to simulate a second nginx worker. + self.p2 = require('prometheus').init('metrics') self.counter1 = self.p:counter("metric1", "Metric 1") self.counter2 = self.p:counter("metric2", "Metric 2", {"f2", "f1"}) self.counter3 = self.p:counter("metric3", "Metric 3", {"f3"}) self.counter4 = self.p:counter("metric4", "Metric 4", {"f1","f2","f3"}) self.gauge1 = self.p:gauge("gauge1", "Gauge 1") + self.gauge1_p2 = self.p2:gauge("gauge1", "Gauge 1") self.gauge2 = self.p:gauge("gauge2", "Gauge 2", {"f2", "f1"}) + self.gauge2_p2 = self.p2:gauge("gauge2", "Gauge 2", {"f2", "f1"}) self.hist1 = self.p:histogram("l1", "Histogram 1") + self.hist1_p2 = self.p2:histogram("l1", "Histogram 1") self.hist2 = self.p:histogram("l2", "Histogram 2", {"var", "site"}) end function TestPrometheus.tearDown() @@ -411,6 +416,7 @@ function TestPrometheus:testReset() self.gauge1:reset() self.p.key_index:sync() luaunit.assertEquals(self.dict:get("gauge1"), nil) + luaunit.assertEquals(self.gauge1.lookup, {}) luaunit.assertEquals(self.dict:get("nginx_metric_errors_total"), 0) self.gauge1:inc(3) @@ -522,6 +528,43 @@ function TestPrometheus:testReset() luaunit.assertEquals(self.dict:get('l2_sum{var="ok",site="site1"}'), nil) luaunit.assertEquals(self.dict:get("nginx_metric_errors_total"), 0) + -- Set a gauge value that will be reset by another worker. + self.gauge1:set(42) + self.gauge2:set(91, {"a", "b1"}) + self.gauge2:set(92, {"a", "b2"}) + luaunit.assertEquals(self.dict:get("gauge1"), 42) + luaunit.assertEquals(self.dict:get('gauge2{f2="a",f1="b1"}'), 91) + luaunit.assertEquals(self.dict:get('gauge2{f2="a",f1="b2"}'), 92) + luaunit.assertNotEquals(self.gauge1.lookup, {}) + luaunit.assertNotEquals(self.gauge2.lookup, {}) + luaunit.assertEquals(self.dict:get("nginx_metric_errors_total"), 0) + + -- After another worker has reset the metric, confirm that the per-metric + -- lookup table has been reset. + self.gauge1_p2:reset() + self.gauge2_p2:del({"a", "b1"}) + self.p.key_index:sync() + luaunit.assertEquals(self.dict:get("gauge1"), nil) + luaunit.assertEquals(self.dict:get('gauge2{f2="a",f1="b1"}'), nil) + luaunit.assertEquals(self.dict:get('gauge2{f2="a",f1="b2"}'), 92) + luaunit.assertEquals(self.gauge1.lookup, {}) + luaunit.assertEquals(self.gauge2.lookup, {}) + luaunit.assertEquals(self.dict:get("nginx_metric_errors_total"), 0) + + -- Similarly, check that a histogram metric reset by another worker + -- results in metric lookup table being reset. + self.hist1:reset() + self.hist1:observe(0.44) + self.p._counter:sync() + luaunit.assertEquals(self.dict:get('l1_sum'), 0.44) + luaunit.assertNotEquals(self.hist1.lookup, {}) + + self.hist1_p2:reset() + self.p.key_index:sync() + luaunit.assertEquals(self.dict:get('l1_sum'), nil) + luaunit.assertEquals(self.hist1.lookup, {}) + luaunit.assertEquals(self.hist1_p2.lookup, {}) + -- key not exist self.gauge2:inc(4, {"key_not_exist", "key_not_exist"}) self.gauge2:reset() @@ -705,7 +748,9 @@ TestKeyIndex = {} function TestKeyIndex:setUp() self.dict = setmetatable({}, SimpleDict) ngx.shared.metrics = self.dict - self.key_index = require('prometheus_keys').new(self.dict, '_prefix_') + self.key_index = require('prometheus_keys').new(self.dict, '_prefix_', function(key) + self.last_deleted_key = key + end) end function TestKeyIndex.tearDown() ngx.logs = nil @@ -795,13 +840,13 @@ function TestKeyIndex:testSync() -- key deleted by another worker self.dict:set("_prefix_delete_count", 1) self.dict:delete("_prefix_key_2") - self.dict:set("_prefix_key_3", "key3") self.key_index:sync() keys = self.key_index:list() luaunit.assertEquals(ngx.logs, nil) luaunit.assertEquals(#keys, 2) luaunit.assertEquals(keys[1], "key1") luaunit.assertEquals(keys[2], "key3") + luaunit.assertEquals(self.last_deleted_key, "key2") end function TestPrometheus:testLookupMaxSize()