diff --git a/elasticsearch.monitoring/fetch_stats.py b/elasticsearch.monitoring/fetch_stats.py index 429ee18..e6ae18f 100755 --- a/elasticsearch.monitoring/fetch_stats.py +++ b/elasticsearch.monitoring/fetch_stats.py @@ -5,6 +5,7 @@ import time import sys import logging +import numbers, types import click import requests @@ -39,6 +40,22 @@ def delete_path(data, selector): return value = value[k] +def flatten_json(y): + out = {} + def flatten(x, name=''): + if type(x) is dict: + for a in x: + flatten(x[a], name + a + '.') + elif type(x) is list: + i = 0 + for a in x: + flatten(a, name + str(i) + '.') + i += 1 + else: + out[name[:-1]] = x + flatten(y) + return out + def assert_http_status(response, expected_status_code=200): if response.status_code != expected_status_code: print response.url, response.json() @@ -48,7 +65,6 @@ def assert_http_status(response, expected_status_code=200): # Elasticsearch cluster to monitor elasticCluster = os.environ.get('ES_METRICS_CLUSTER_URL', 'http://localhost:9200/') -interval = int(os.environ.get('ES_METRICS_INTERVAL', '10')) # Elasticsearch Cluster to Send metrics to monitoringCluster = os.environ.get('ES_METRICS_MONITORING_CLUSTER_URL', 'http://localhost:9200/') @@ -122,43 +138,92 @@ def fetch_index_stats(): pass def create_templates(): - # TODO verify and apply index templates here for filename in os.listdir(os.path.join(working_dir, 'templates')): if filename.endswith(".json"): with open(os.path.join(working_dir, 'templates', filename)) as query_base: template = query_base.read() template = template.replace('{{INDEX_PREFIX}}', indexPrefix + '*').strip() - # print template templates_response = requests.put(monitoringCluster + '_template/' + filename[:-5], data = template) assert_http_status(templates_response) -def main(): + +def poll_metrics(cluster_host, monitor, monitor_host): + cluster_health, node_stats = get_all_data(cluster_host) + if monitor == 'elasticsearch': + into_elasticsearch(monitor_host, cluster_health, node_stats) + elif monitor == 'signalfx': + into_signalfx(monitor_host, cluster_health, node_stats) + + +def get_all_data(cluster_host): + cluster_health = fetch_cluster_health(cluster_host) + node_stats = fetch_nodes_stats(cluster_host) + # TODO generate cluster_state documents + return cluster_health, node_stats + + +def into_signalfx(sfx_key, cluster_health, node_stats): + import signalfx + sfx = signalfx.SignalFx() + ingest = sfx.ingest(sfx_key) + for node in node_stats: + source_node = node['source_node'] + for s in node_stats_to_collect: + flattened = flatten_json(node['node_stats'][s]) + for k,v in flattened.items(): + if isinstance(v, (int, float)) and not isinstance(v, types.BooleanType): + ingest.send(gauges=[{"metric": 'elasticsearch.node.' + s + '.' + k, "value": v, + "dimensions": { + 'cluster_uuid': node.get('cluster_uuid'), + 'cluster_name': node.get('cluster_name'), + 'node_name': source_node.get('name'), + 'node_host': source_node.get('host'), + 'node_host': source_node.get('ip'), + 'node_uuid': source_node.get('uuid'), + 'cluster_name': source_node.get('uuid'), + } + }]) + ingest.stop() + +def into_elasticsearch(monitor_host, cluster_health, node_stats): utc_datetime = datetime.datetime.utcnow() index_name = indexPrefix + str(utc_datetime.strftime('%Y.%m.%d')) - cluster_health = fetch_cluster_health(elasticCluster) cluster_health_data = ['{"index":{"_index":"'+index_name+'","_type":"cluster_health"}}\n' + json.dumps(o)+'\n' for o in cluster_health] - # TODO generate cluster_state documents - - node_stats = fetch_nodes_stats(elasticCluster) node_stats_data = ['{"index":{"_index":"'+index_name+'","_type":"node_stats"}}\n' + json.dumps(o)+'\n' for o in node_stats] + data = node_stats_data + cluster_health_data - bulk_response = requests.post(monitoringCluster + index_name + '/_bulk', data = '\n'.join(data)) + bulk_response = requests.post(monitor_host + index_name + '/_bulk', data = '\n'.join(data)) assert_http_status(bulk_response) for item in bulk_response.json()["items"]: if item.get("index") and item.get("index").get("status") != 201: - print json.dumps(item.get("index").get("error")) + click.echo(json.dumps(item.get("index").get("error"))) + + +@click.command() +@click.option('--interval', default=10, help='Interval (in seconds) to run this') +@click.option('--index-prefix', default='.monitoring-es-2-', help='Index prefix for Elastic monitor') +@click.argument('monitor-host') +@click.argument('monitor', default='elasticsearch') +@click.argument('cluster-host', default='http://localhost:9200/') +def main(interval, cluster_host, monitor, monitor_host, index_prefix): + global cluster_uuid, indexPrefix + + indexPrefix = index_prefix or indexPrefix + + click.echo('Monitoring %s into %s at %s' % (cluster_host, monitor, monitor_host)) -if __name__ == "__main__": - # TODO use click response = requests.get(elasticCluster) assert_http_status(response) cluster_uuid = response.json()['cluster_uuid'] - create_templates() + if monitor == 'elasticsearch': + create_templates() + elif monitor == 'signalfx': + import signalfx - recurring = True + recurring = interval > 0 if not recurring: - main() + poll_metrics(cluster_host, monitor, monitor_host) else: try: nextRun = 0 @@ -166,7 +231,9 @@ def main(): if time.time() >= nextRun: nextRun = time.time() + interval now = time.time() - main() + + poll_metrics(cluster_host, monitor, monitor_host) + elapsed = time.time() - now print "Total Elapsed Time: %s" % elapsed timeDiff = nextRun - time.time() @@ -181,3 +248,6 @@ def main(): sys.exit(0) except SystemExit: os._exit(0) + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt index 0d8c96e..2124c65 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ click requests +signalfx