Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding SignalFX support #1

Merged
merged 3 commits into from
Jul 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 86 additions & 16 deletions elasticsearch.monitoring/fetch_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import sys
import logging
import numbers, types

import click
import requests
Expand Down Expand Up @@ -39,6 +40,22 @@ def delete_path(data, selector):
return
value = value[k]

def flatten_json(y):
out = {}
def flatten(x, name=''):
if type(x) is dict:
for a in x:
flatten(x[a], name + a + '.')
elif type(x) is list:
i = 0
for a in x:
flatten(a, name + str(i) + '.')
i += 1
else:
out[name[:-1]] = x
flatten(y)
return out

def assert_http_status(response, expected_status_code=200):
if response.status_code != expected_status_code:
print response.url, response.json()
Expand All @@ -48,7 +65,6 @@ def assert_http_status(response, expected_status_code=200):

# Elasticsearch cluster to monitor
elasticCluster = os.environ.get('ES_METRICS_CLUSTER_URL', 'http://localhost:9200/')
interval = int(os.environ.get('ES_METRICS_INTERVAL', '10'))

# Elasticsearch Cluster to Send metrics to
monitoringCluster = os.environ.get('ES_METRICS_MONITORING_CLUSTER_URL', 'http://localhost:9200/')
Expand Down Expand Up @@ -122,51 +138,102 @@ def fetch_index_stats():
pass

def create_templates():
# TODO verify and apply index templates here
for filename in os.listdir(os.path.join(working_dir, 'templates')):
if filename.endswith(".json"):
with open(os.path.join(working_dir, 'templates', filename)) as query_base:
template = query_base.read()
template = template.replace('{{INDEX_PREFIX}}', indexPrefix + '*').strip()
# print template
templates_response = requests.put(monitoringCluster + '_template/' + filename[:-5], data = template)
assert_http_status(templates_response)

def main():

def poll_metrics(cluster_host, monitor, monitor_host):
cluster_health, node_stats = get_all_data(cluster_host)
if monitor == 'elasticsearch':
into_elasticsearch(monitor_host, cluster_health, node_stats)
elif monitor == 'signalfx':
into_signalfx(monitor_host, cluster_health, node_stats)


def get_all_data(cluster_host):
cluster_health = fetch_cluster_health(cluster_host)
node_stats = fetch_nodes_stats(cluster_host)
# TODO generate cluster_state documents
return cluster_health, node_stats


def into_signalfx(sfx_key, cluster_health, node_stats):
import signalfx
sfx = signalfx.SignalFx()
ingest = sfx.ingest(sfx_key)
for node in node_stats:
source_node = node['source_node']
for s in node_stats_to_collect:
flattened = flatten_json(node['node_stats'][s])
for k,v in flattened.items():
if isinstance(v, (int, float)) and not isinstance(v, types.BooleanType):
ingest.send(gauges=[{"metric": 'elasticsearch.node.' + s + '.' + k, "value": v,
"dimensions": {
'cluster_uuid': node.get('cluster_uuid'),
'cluster_name': node.get('cluster_name'),
'node_name': source_node.get('name'),
'node_host': source_node.get('host'),
'node_host': source_node.get('ip'),
'node_uuid': source_node.get('uuid'),
'cluster_name': source_node.get('uuid'),
}
}])
ingest.stop()

def into_elasticsearch(monitor_host, cluster_health, node_stats):
utc_datetime = datetime.datetime.utcnow()
index_name = indexPrefix + str(utc_datetime.strftime('%Y.%m.%d'))

cluster_health = fetch_cluster_health(elasticCluster)
cluster_health_data = ['{"index":{"_index":"'+index_name+'","_type":"cluster_health"}}\n' + json.dumps(o)+'\n' for o in cluster_health]
# TODO generate cluster_state documents

node_stats = fetch_nodes_stats(elasticCluster)
node_stats_data = ['{"index":{"_index":"'+index_name+'","_type":"node_stats"}}\n' + json.dumps(o)+'\n' for o in node_stats]

data = node_stats_data + cluster_health_data
bulk_response = requests.post(monitoringCluster + index_name + '/_bulk', data = '\n'.join(data))
bulk_response = requests.post(monitor_host + index_name + '/_bulk', data = '\n'.join(data))
assert_http_status(bulk_response)
for item in bulk_response.json()["items"]:
if item.get("index") and item.get("index").get("status") != 201:
print json.dumps(item.get("index").get("error"))
click.echo(json.dumps(item.get("index").get("error")))


@click.command()
@click.option('--interval', default=10, help='Interval (in seconds) to run this')
@click.option('--index-prefix', default='.monitoring-es-2-', help='Index prefix for Elastic monitor')
@click.argument('monitor-host')
@click.argument('monitor', default='elasticsearch')
@click.argument('cluster-host', default='http://localhost:9200/')
def main(interval, cluster_host, monitor, monitor_host, index_prefix):
global cluster_uuid, indexPrefix

indexPrefix = index_prefix or indexPrefix

click.echo('Monitoring %s into %s at %s' % (cluster_host, monitor, monitor_host))

if __name__ == "__main__":
# TODO use click
response = requests.get(elasticCluster)
assert_http_status(response)
cluster_uuid = response.json()['cluster_uuid']
create_templates()
if monitor == 'elasticsearch':
create_templates()
elif monitor == 'signalfx':
import signalfx

recurring = True
recurring = interval > 0
if not recurring:
main()
poll_metrics(cluster_host, monitor, monitor_host)
else:
try:
nextRun = 0
while True:
if time.time() >= nextRun:
nextRun = time.time() + interval
now = time.time()
main()

poll_metrics(cluster_host, monitor, monitor_host)

elapsed = time.time() - now
print "Total Elapsed Time: %s" % elapsed
timeDiff = nextRun - time.time()
Expand All @@ -181,3 +248,6 @@ def main():
sys.exit(0)
except SystemExit:
os._exit(0)

if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
click
requests
signalfx