From 6697cd9cc2bb33f8d8b55d90c3105336993ba3a2 Mon Sep 17 00:00:00 2001 From: Ben Keith Date: Fri, 10 Nov 2017 16:47:38 -0500 Subject: [PATCH] Making list length fetching support globs Making each configured instance have its own class to avoid global variables --- README.md | 1 + redis_client.py | 64 ++++++++ redis_info.py | 420 ++++++++++++++++++++++-------------------------- 3 files changed, 260 insertions(+), 225 deletions(-) create mode 100644 redis_client.py diff --git a/README.md b/README.md index 8e44a1e..18b79d4 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ You can capture any kind of Redis metrics like: * Uptime * Changes since last save * Replication delay (per slave) + * The length of list values Installation and Configuration diff --git a/redis_client.py b/redis_client.py new file mode 100644 index 0000000..0a2a514 --- /dev/null +++ b/redis_client.py @@ -0,0 +1,64 @@ +import socket + +class RedisClient(): + def __init__(self, host, port, auth): + self.host = host + self.port = port + self.auth = auth + + def connect(self): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((self.host, self.port)) + + self.file = self.socket.makefile('r') + + if self.auth is not None: + self.send('auth %s' % (self.auth)) + + self.read_response() + + def __enter__(self): + self.connect() + return self + + def __exit__(self, *args): + if self.socket: + self.socket.close() + + def send(self, message): + return self.socket.sendall("%s\r\n" % message) + + def read_response(self): + first_line = self.file.readline() + if first_line.startswith('-'): + raise RedisError(first_line) + + if first_line.startswith('*'): + return self.read_array(first_line) + elif first_line.startswith('$'): + return self.read_bulk_string(first_line) + elif first_line.startswith(':'): + return first_line.lstrip(':').rstrip() + elif first_line.startswith('+'): + return first_line.lstrip('+').rstrip() + else: + raise ValueError("Unknown Redis response: %s" % first_line) + + def read_array(self, first_line): + size = int(first_line.lstrip("*").rstrip()) + return [self.read_response() for i in range(0, size)] + + def read_bulk_string(self, first_line): + size = int(first_line.lstrip("$").rstrip()) + if size == -1: + return None + + s = self.file.read(size) + # Get rid of \r\n at end + self.file.read(2) + + return s + + +class RedisError(Exception): + pass diff --git a/redis_info.py b/redis_info.py index 803e620..f94f7a5 100644 --- a/redis_info.py +++ b/redis_info.py @@ -32,131 +32,184 @@ import collectd import socket import re - -# Verbose logging on/off. Override in config by specifying 'Verbose'. -VERBOSE_LOGGING = False - -CONFIGS = [] -REDIS_INFO = {} - - -def fetch_info(conf): - """Connect to Redis server and request info""" - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect((conf['host'], conf['port'])) - log_verbose('Connected to Redis at %s:%s' % (conf['host'], - conf['port'])) - except socket.error, e: - collectd.error('redis_info plugin: Error connecting to %s:%d - %r' - % (conf['host'], conf['port'], e)) - return None - - fp = s.makefile('r') - - if conf['auth'] is not None: - log_verbose('Sending auth command') - s.sendall('auth %s\r\n' % (conf['auth'])) - - status_line = fp.readline() - if not status_line.startswith('+OK'): - # -ERR invalid password - # -ERR Client sent AUTH, but no password is set - collectd.error( - 'redis_info plugin: Error sending auth to %s:%d - %r' - % (conf['host'], conf['port'], status_line)) +from redis_client import RedisClient, RedisError + + +class RedisCollector(): + def __init__(self, host, port, auth, instance, metric_types, verbose, llen_keys): + self.host = host + self.port = port + self.auth = auth + self.instance = instance + self.metric_types = metric_types + self.verbose = verbose + self.llen_keys = llen_keys + + def fetch_info(self, client): + """Request info from the Redis server""" + self.log_verbose('Sending info command') + client.send('info') + try: + data = client.read_response() + except RedisError, e: + collectd.error('redis_info plugin: Error response from %s:%d - %r' + % (self.host, self.port, e)) return None - log_verbose('Sending info command') - s.sendall('info\r\n') - status_line = fp.readline() - - if status_line.startswith('-'): - collectd.error('redis_info plugin: Error response from %s:%d - %r' - % (conf['host'], conf['port'], status_line)) - s.close() - return None - - # status_line looks like: $ - content_length = int(status_line[1:-1]) - data = fp.read(content_length) - log_verbose('Received data: %s' % data) - - linesep = '\r\n' if '\r\n' in data else '\n' - info_dict = parse_info(data.split(linesep)) - fp.close() - - # monitoring lengths of custom keys - key_dict = get_llen_keys(s, conf) - info_dict.update(key_dict) - - s.close() - - return info_dict - - -def get_llen_keys(socket, conf): - """ - for each llen_key specified in the config file, - grab the length of that key from the corresponding - database index. return a dictionary of the - keys - """ - llen_fp = socket.makefile('r') - key_dict = {} - if len(conf['llen_keys']) > 0: - for db, keys in conf['llen_keys'].items(): - socket.sendall('select %d\r\n' % db) - status_line = llen_fp.readline() # +OK - for key in keys: - socket.sendall('llen %s\r\n' % key) - status_line = llen_fp.readline() # :VALUE - try: - val = int(filter(str.isdigit, status_line)) - except ValueError: - collectd.warning('redis_info plugin: key %s is not of type list, cannot get length' % key) - - subkey = "db{0}_llen_{1}".format(db, key) - key_dict.update({subkey: val}) - - llen_fp.close() - return key_dict - - -def parse_info(info_lines): - """Parse info response from Redis""" - info = {} - for line in info_lines: - if "" == line or line.startswith('#'): - continue - - if ':' not in line: - collectd.warning('redis_info plugin: Bad format for info line: %s' - % line) - continue - - key, val = line.split(':') - - # Handle multi-value keys (for dbs and slaves). - # db lines look like "db0:keys=10,expire=0" - # slave lines look like - # "slave0:ip=192.168.0.181,port=6379, - # state=online,offset=1650991674247,lag=1" - if ',' in val: - split_val = val.split(',') - for sub_val in split_val: - k, _, v = sub_val.rpartition('=') - sub_key = "{0}_{1}".format(key, k) - info[sub_key] = v - else: - info[key] = val - - # compatibility with pre-2.6 redis (used changes_since_last_save) - info["changes_since_last_save"] = info.get("changes_since_last_save", - info.get( - "rdb_changes_since_last_save")) + self.log_verbose('Received data: %s' % data) + + linesep = '\r\n' if '\r\n' in data else '\n' + info_dict = self.parse_info(data.split(linesep)) + + return info_dict + + def parse_info(self, info_lines): + """Parse info response from Redis""" + info = {} + for line in info_lines: + if "" == line or line.startswith('#'): + continue + + if ':' not in line: + collectd.warning('redis_info plugin: Bad format for info line: %s' + % line) + continue + + key, val = line.split(':') + + # Handle multi-value keys (for dbs and slaves). + # db lines look like "db0:keys=10,expire=0" + # slave lines look like + # "slave0:ip=192.168.0.181,port=6379, + # state=online,offset=1650991674247,lag=1" + if ',' in val: + split_val = val.split(',') + for sub_val in split_val: + k, _, v = sub_val.rpartition('=') + sub_key = "{0}_{1}".format(key, k) + info[sub_key] = v + else: + info[key] = val + + # compatibility with pre-2.6 redis (used changes_since_last_save) + info["changes_since_last_save"] = info.get("changes_since_last_save", + info.get( + "rdb_changes_since_last_save")) + + return info + + def dispatch_info(self, client): + info = self.fetch_info(client) + + if not info: + collectd.error('redis plugin: No info received') + return + + for keyTuple, val in self.metric_types.iteritems(): + key, mtype = keyTuple + if key == 'total_connections_received' and mtype == 'counter': + self.dispatch_value(info['total_connections_received'], + 'counter', + type_instance='connections_received') + elif key == 'total_commands_processed' and mtype == 'counter': + self.dispatch_value(info['total_commands_processed'], + 'counter', + type_instance='commands_processed') + else: + self.dispatch_value(info[key], mtype, type_instance=key) + + def dispatch_list_lengths(self, client): + """ + For each llen_key specified in the config file, + grab the length of that key from the corresponding + database index. + """ + key_dict = {} + for db, patterns in self.llen_keys.items(): + client.send('select %d' % db) + try: + resp = client.read_response() + except RedisError, e: + collectd.error("Could not select Redis db %s: %s" % (db, e)) + continue + + for pattern in patterns: + keys = [] + # If there is a glob, get every key matching it + if '*' in pattern: + client.send('KEYS %s' % pattern) + keys = client.read_response() + else: + keys = [pattern] + + for key in keys: + self.fetch_and_dispatch_llen_for_key(client, key, db) + + def fetch_and_dispatch_llen_for_key(self, client, key, db): + client.send('llen %s' % key) + try: + val = client.read_response() + except RedisError, e: + collectd.warning('redis_info plugin: could not get length of key %s in db %s: %s' % (key, db, e)) + return + + dimensions = {'key_name': key, 'db_index': db} + self.dispatch_value(val, 'gauge', type_instance='key_llen', dimensions=dimensions) + + def dispatch_value(self, value, type, plugin_instance=None, type_instance=None, + dimensions={}): + """Read a key from info response data and dispatch a value""" + + try: + value = int(value) + except ValueError: + value = float(value) + + self.log_verbose('Sending value: %s=%s (%s)' % (type_instance, value, dimensions)) + + val = collectd.Values(plugin='redis_info') + val.type = type + val.type_instance = type_instance + val.values = [value] + + plugin_instance = self.instance + if plugin_instance is None: + plugin_instance = '{host}:{port}'.format(host=self.host, + port=self.port) + + val.plugin_instance = "{0}{1}".format(plugin_instance, _format_dimensions(dimensions)) + + # With some versions of CollectD, a dummy metadata map must be added + # to each value for it to be correctly serialized to JSON by the + # write_http plugin. See + # https://github.com/collectd/collectd/issues/716 + val.meta = {'0': True} + val.dispatch() + + def read_callback(self): + try: + self.get_metrics() + except socket.error, e: + collectd.error('redis_info plugin: Error connecting to %s:%d - %r' + % (self.host, self.port, e)) + return + except RedisError, e: + collectd.error('redis_info plugin: Error getting metrics from %s:%d - %r' + % (self.host, self.port, e)) + return + + def get_metrics(self): + with RedisClient(self.host, self.port, self.auth) as client: + self.log_verbose('Connected to Redis at %s:%s' % (self.host, self.port)) + + self.dispatch_info(client) + self.dispatch_list_lengths(client) + + def log_verbose(self, msg): + if not self.verbose: + return + collectd.info('redis plugin [verbose]: %s' % msg) - return info def configure_callback(conf): @@ -166,11 +219,12 @@ def configure_callback(conf): auth = None instance = None llen_keys = {} + metric_types = {} + verbose = False for node in conf.children: key = node.key.lower() val = node.values[0] - log_verbose('Analyzing config %s key (value: %s)' % (key, val)) searchObj = re.search(r'redis_(.*)$', key, re.M | re.I) if key == 'host': @@ -180,73 +234,37 @@ def configure_callback(conf): elif key == 'auth': auth = val elif key == 'verbose': - global VERBOSE_LOGGING - VERBOSE_LOGGING = bool(node.values[0]) or VERBOSE_LOGGING + verbose = node.values[0] elif key == 'instance': instance = val - elif key == 'llen_key': + elif key == 'sendlistlength': if (len(node.values)) == 2: - llen_keys.setdefault(node.values[0], []).append(node.values[1]) + llen_keys.setdefault(int(node.values[0]), []).append(node.values[1]) else: collectd.warning("redis_info plugin: monitoring length of keys requires both \ database index and key value") elif searchObj: - global REDIS_INFO - log_verbose('Matching expression found: key: %s - value: %s' % - (searchObj.group(1), val)) - REDIS_INFO[searchObj.group(1), val] = True + metric_types[searchObj.group(1), val] = True else: collectd.warning('redis_info plugin: Unknown config key: %s.' % key) continue - log_verbose( - 'Configured with host=%s, port=%s, instance name=%s, using_auth=%s' - % (host, port, instance, (auth is not None))) + if verbose: + collectd.info('Configured with host=%s, port=%s, instance name=%s, using_auth=%s, llen_keys=%s' + % (host, port, instance, (auth is not None), llen_keys)) - CONFIGS.append({'host': host, - 'port': port, - 'auth': auth, - 'instance': instance, - 'llen_keys': llen_keys}) + collector = RedisCollector(**{ + 'host': host, + 'port': port, + 'auth': auth, + 'instance': instance, + 'metric_types': metric_types, + 'verbose': verbose, + 'llen_keys': llen_keys}) - -def dispatch_value(info, key, type, plugin_instance=None, type_instance=None): - """Read a key from info response data and dispatch a value""" - - if key not in info: - collectd.warning('redis_info plugin: Info key not found: %s' % key) - return - - if plugin_instance is None: - plugin_instance = 'unknown redis' - collectd.error( - 'redis_info plugin: plugin_instance is not set, Info key: %s' % - key) - - if not type_instance: - type_instance = key - - try: - value = int(info[key]) - except ValueError: - value = float(info[key]) - - log_verbose('Sending value: %s=%s' % (type_instance, value)) - - val = collectd.Values(plugin='redis_info') - val.type = type - val.type_instance = type_instance - val.plugin_instance = plugin_instance - val.values = [value] - - # With some versions of CollectD, a dummy metadata map must be added - # to each value for it to be correctly serialized to JSON by the - # write_http plugin. See - # https://github.com/collectd/collectd/issues/716 - val.meta = {'0': True} - val.dispatch() + collectd.register_read(collector.read_callback, name="%s:%s:%s" % (host, port, instance)) def _format_dimensions(dimensions): @@ -261,61 +279,13 @@ def _format_dimensions(dimensions): Returns: str: Comma-separated list of dimensions """ + if not dimensions: + return "" dim_pairs = ["%s=%s" % (k, v) for k, v in dimensions.iteritems()] return "[%s]" % (",".join(dim_pairs)) -def read_callback(): - for conf in CONFIGS: - get_metrics(conf) - - -def get_metrics(conf): - info = fetch_info(conf) - - if not info: - collectd.error('redis plugin: No info received') - return - - plugin_instance = conf['instance'] - if plugin_instance is None: - plugin_instance = '{host}:{port}'.format(host=conf['host'], - port=conf['port']) - - for keyTuple, val in REDIS_INFO.iteritems(): - key, val = keyTuple - - if key == 'total_connections_received' and val == 'counter': - dispatch_value(info, - 'total_connections_received', - 'counter', - plugin_instance, - 'connections_received') - elif key == 'total_commands_processed' and val == 'counter': - dispatch_value(info, - 'total_commands_processed', - 'counter', - plugin_instance, - 'commands_processed') - else: - dispatch_value(info, key, val, plugin_instance) - - for db, keys in conf['llen_keys'].items(): - for key in keys: - subkey = "db{0}_llen_{1}".format(db, key) - val = info.get(subkey) - dimensions = _format_dimensions({'key_name': key, 'db_index': db}) - plugin_dimensions = "{0}{1}".format(plugin_instance, dimensions) - dispatch_value(info, subkey, 'gauge', plugin_dimensions, 'key_llen') - - -def log_verbose(msg): - if not VERBOSE_LOGGING: - return - collectd.info('redis plugin [verbose]: %s' % msg) - # register callbacks collectd.register_config(configure_callback) -collectd.register_read(read_callback)