Skip to content

Commit

Permalink
Adding SignalFX support (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
synhershko authored Jul 5, 2017
1 parent 8536843 commit 7914952
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 16 deletions.
102 changes: 86 additions & 16 deletions elasticsearch.monitoring/fetch_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import sys
import logging
import numbers, types

import click
import requests
Expand Down Expand Up @@ -39,6 +40,22 @@ def delete_path(data, selector):
return
value = value[k]

def flatten_json(y):
out = {}
def flatten(x, name=''):
if type(x) is dict:
for a in x:
flatten(x[a], name + a + '.')
elif type(x) is list:
i = 0
for a in x:
flatten(a, name + str(i) + '.')
i += 1
else:
out[name[:-1]] = x
flatten(y)
return out

def assert_http_status(response, expected_status_code=200):
if response.status_code != expected_status_code:
print response.url, response.json()
Expand All @@ -48,7 +65,6 @@ def assert_http_status(response, expected_status_code=200):

# Elasticsearch cluster to monitor
elasticCluster = os.environ.get('ES_METRICS_CLUSTER_URL', 'http://localhost:9200/')
interval = int(os.environ.get('ES_METRICS_INTERVAL', '10'))

# Elasticsearch Cluster to Send metrics to
monitoringCluster = os.environ.get('ES_METRICS_MONITORING_CLUSTER_URL', 'http://localhost:9200/')
Expand Down Expand Up @@ -122,51 +138,102 @@ def fetch_index_stats():
pass

def create_templates():
# TODO verify and apply index templates here
for filename in os.listdir(os.path.join(working_dir, 'templates')):
if filename.endswith(".json"):
with open(os.path.join(working_dir, 'templates', filename)) as query_base:
template = query_base.read()
template = template.replace('{{INDEX_PREFIX}}', indexPrefix + '*').strip()
# print template
templates_response = requests.put(monitoringCluster + '_template/' + filename[:-5], data = template)
assert_http_status(templates_response)

def main():

def poll_metrics(cluster_host, monitor, monitor_host):
cluster_health, node_stats = get_all_data(cluster_host)
if monitor == 'elasticsearch':
into_elasticsearch(monitor_host, cluster_health, node_stats)
elif monitor == 'signalfx':
into_signalfx(monitor_host, cluster_health, node_stats)


def get_all_data(cluster_host):
cluster_health = fetch_cluster_health(cluster_host)
node_stats = fetch_nodes_stats(cluster_host)
# TODO generate cluster_state documents
return cluster_health, node_stats


def into_signalfx(sfx_key, cluster_health, node_stats):
import signalfx
sfx = signalfx.SignalFx()
ingest = sfx.ingest(sfx_key)
for node in node_stats:
source_node = node['source_node']
for s in node_stats_to_collect:
flattened = flatten_json(node['node_stats'][s])
for k,v in flattened.items():
if isinstance(v, (int, float)) and not isinstance(v, types.BooleanType):
ingest.send(gauges=[{"metric": 'elasticsearch.node.' + s + '.' + k, "value": v,
"dimensions": {
'cluster_uuid': node.get('cluster_uuid'),
'cluster_name': node.get('cluster_name'),
'node_name': source_node.get('name'),
'node_host': source_node.get('host'),
'node_host': source_node.get('ip'),
'node_uuid': source_node.get('uuid'),
'cluster_name': source_node.get('uuid'),
}
}])
ingest.stop()

def into_elasticsearch(monitor_host, cluster_health, node_stats):
utc_datetime = datetime.datetime.utcnow()
index_name = indexPrefix + str(utc_datetime.strftime('%Y.%m.%d'))

cluster_health = fetch_cluster_health(elasticCluster)
cluster_health_data = ['{"index":{"_index":"'+index_name+'","_type":"cluster_health"}}\n' + json.dumps(o)+'\n' for o in cluster_health]
# TODO generate cluster_state documents

node_stats = fetch_nodes_stats(elasticCluster)
node_stats_data = ['{"index":{"_index":"'+index_name+'","_type":"node_stats"}}\n' + json.dumps(o)+'\n' for o in node_stats]

data = node_stats_data + cluster_health_data
bulk_response = requests.post(monitoringCluster + index_name + '/_bulk', data = '\n'.join(data))
bulk_response = requests.post(monitor_host + index_name + '/_bulk', data = '\n'.join(data))
assert_http_status(bulk_response)
for item in bulk_response.json()["items"]:
if item.get("index") and item.get("index").get("status") != 201:
print json.dumps(item.get("index").get("error"))
click.echo(json.dumps(item.get("index").get("error")))


@click.command()
@click.option('--interval', default=10, help='Interval (in seconds) to run this')
@click.option('--index-prefix', default='.monitoring-es-2-', help='Index prefix for Elastic monitor')
@click.argument('monitor-host')
@click.argument('monitor', default='elasticsearch')
@click.argument('cluster-host', default='http://localhost:9200/')
def main(interval, cluster_host, monitor, monitor_host, index_prefix):
global cluster_uuid, indexPrefix

indexPrefix = index_prefix or indexPrefix

click.echo('Monitoring %s into %s at %s' % (cluster_host, monitor, monitor_host))

if __name__ == "__main__":
# TODO use click
response = requests.get(elasticCluster)
assert_http_status(response)
cluster_uuid = response.json()['cluster_uuid']
create_templates()
if monitor == 'elasticsearch':
create_templates()
elif monitor == 'signalfx':
import signalfx

recurring = True
recurring = interval > 0
if not recurring:
main()
poll_metrics(cluster_host, monitor, monitor_host)
else:
try:
nextRun = 0
while True:
if time.time() >= nextRun:
nextRun = time.time() + interval
now = time.time()
main()

poll_metrics(cluster_host, monitor, monitor_host)

elapsed = time.time() - now
print "Total Elapsed Time: %s" % elapsed
timeDiff = nextRun - time.time()
Expand All @@ -181,3 +248,6 @@ def main():
sys.exit(0)
except SystemExit:
os._exit(0)

if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
click
requests
signalfx

0 comments on commit 7914952

Please sign in to comment.