From ba0fddb8665e8f72b456df0b80e3ae138b5d46de Mon Sep 17 00:00:00 2001 From: Hemanth Kini Date: Wed, 16 Sep 2020 12:47:55 -0700 Subject: [PATCH 1/7] Add concurrent query execution support This commit adds the ability to concurrently execute queries to the exporter. If initialized with --threads NUM_THREADS, the exporter schedules queries to be executed by a ThreadPoolExecutor using up to that many threads. Tested locally on my machine with an Elasticsearch cluster. --- prometheus_es_exporter/__init__.py | 22 ++++++++++++++++++++-- prometheus_es_exporter/scheduler.py | 18 ++++++++++++------ prometheus_es_exporter/utils.py | 6 +++++- 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/prometheus_es_exporter/__init__.py b/prometheus_es_exporter/__init__.py index d89e246..5b5f517 100644 --- a/prometheus_es_exporter/__init__.py +++ b/prometheus_es_exporter/__init__.py @@ -7,6 +7,8 @@ import os import sched import time +import concurrent.futures +import threading from elasticsearch import Elasticsearch from elasticsearch.exceptions import ConnectionTimeout @@ -32,7 +34,7 @@ } METRICS_BY_QUERY = {} - +METRICS_BY_QUERY_LOCK = threading.Lock() def collector_up_gauge(name_list, description, succeeded=True): metric_name = format_metric_name(*name_list, 'up') @@ -194,7 +196,9 @@ def collect(self): # as it may be updated by other threads. # (only first level - lower levels are replaced # wholesale, so don't worry about them) + METRICS_BY_QUERY_LOCK.acquire() query_metrics = METRICS_BY_QUERY.copy() + METRICS_BY_QUERY_LOCK.release() for metric_dict in query_metrics.values(): yield from gauge_generator(metric_dict) @@ -214,6 +218,7 @@ def run_query(es_client, query_name, indices, query, # If this query has successfully run before, we need to handle any # metrics produced by that previous run. + METRICS_BY_QUERY_LOCK.acquire() if query_name in METRICS_BY_QUERY: old_metric_dict = METRICS_BY_QUERY[query_name] @@ -231,10 +236,12 @@ def run_query(es_client, query_name, indices, query, zero_missing=True) METRICS_BY_QUERY[query_name] = metric_dict + METRICS_BY_QUERY_LOCK.release() else: # If this query has successfully run before, we need to handle any # missing metrics. + METRICS_BY_QUERY_LOCK.acquire() if query_name in METRICS_BY_QUERY: old_metric_dict = METRICS_BY_QUERY[query_name] @@ -250,6 +257,7 @@ def run_query(es_client, query_name, indices, query, zero_missing=True) METRICS_BY_QUERY[query_name] = metric_dict + METRICS_BY_QUERY_LOCK.release() # Based on click.Choice @@ -428,6 +436,8 @@ def conv(value): 'in filename order. ' 'Can be absolute, or relative to the current working directory. ' '(default: ./config)') +@click.option('--threads', + help='Enables concurrent query execution using the number of threads specified. If the number of threads is less than 1, throws an error.') @click.option('--cluster-health-disable', default=False, is_flag=True, help='Disable cluster health monitoring.') @click.option('--cluster-health-timeout', default=10.0, @@ -509,6 +519,14 @@ def cli(**options): '--indices-stats-mode must be "indices" for ' '--indices-stats-indices to be used.') + executor = None + if options['threads']: + num_threads = int(options['threads']) + if num_threads < 1: + raise click.BadOptionUsage('threads', + '--threads must specify a number of threads greater than 0.') + executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) + log_handler = logging.StreamHandler() log_format = '[%(asctime)s] %(name)s.%(levelname)s %(threadName)s %(message)s' formatter = JogFormatter(log_format) if options['json_logging'] else logging.Formatter(log_format) @@ -571,7 +589,7 @@ def cli(**options): if queries: for query_name, (interval, timeout, indices, query, on_error, on_missing) in queries.items(): - schedule_job(scheduler, interval, + schedule_job(scheduler, executor, interval, run_query, es_client, query_name, indices, query, timeout, on_error, on_missing) else: diff --git a/prometheus_es_exporter/scheduler.py b/prometheus_es_exporter/scheduler.py index 3f2609f..3dfef08 100644 --- a/prometheus_es_exporter/scheduler.py +++ b/prometheus_es_exporter/scheduler.py @@ -1,10 +1,10 @@ import time import logging +import concurrent.futures log = logging.getLogger(__name__) - -def schedule_job(scheduler, interval, func, *args, **kwargs): +def schedule_job(scheduler, executor, interval, func, *args, **kwargs): """ Schedule a function to be run on a fixed interval. @@ -12,10 +12,16 @@ def schedule_job(scheduler, interval, func, *args, **kwargs): """ def scheduled_run(scheduled_time, *args, **kwargs): - try: - func(*args, **kwargs) - except Exception: - log.exception('Error while running scheduled job.') + def run_func(func, *args, **kwargs): + try: + func(*args, **kwargs) + except Exception: + log.exception('Error while running scheduled job.') + + if executor != None: + executor.submit(lambda *args, **kwargs: run_func(func, *args, **kwargs), *args, **kwargs) + else: + run_func(func, *args, **kwargs) current_time = time.monotonic() next_scheduled_time = scheduled_time + interval diff --git a/prometheus_es_exporter/utils.py b/prometheus_es_exporter/utils.py index f1dd81a..66506aa 100644 --- a/prometheus_es_exporter/utils.py +++ b/prometheus_es_exporter/utils.py @@ -52,7 +52,8 @@ def wrapper(*args, **kwargs): return decorator -def nice_shutdown(shutdown_signals=(signal.SIGINT, signal.SIGTERM)): +def nice_shutdown(executor=None, + shutdown_signals=(signal.SIGINT, signal.SIGTERM)): """ Logs shutdown signals nicely. @@ -64,6 +65,9 @@ def nice_shutdown(shutdown_signals=(signal.SIGINT, signal.SIGTERM)): def sig_handler(signum, _): log.info('Received signal %(signal)s.', {'signal': signal.Signals(signum).name}) + # Finish ThreadPoolExecutor execution cleanly, if using threading. + if executor != None: + executor.shutdown() # Raise SystemExit to bypass (most) try/except blocks. sys.exit() From 752ba78589b82de09715a79f00cc40175e906c52 Mon Sep 17 00:00:00 2001 From: Hemanth Kini Date: Wed, 16 Sep 2020 12:47:55 -0700 Subject: [PATCH 2/7] Add concurrent query execution support This commit adds the ability to concurrently execute queries to the exporter. If initialized with --threads NUM_THREADS, the exporter schedules queries to be executed by a ThreadPoolExecutor using up to that many threads. Tested locally on my machine with an Elasticsearch cluster. --- prometheus_es_exporter/__init__.py | 22 ++++++++++++++++++++-- prometheus_es_exporter/scheduler.py | 17 +++++++++++------ prometheus_es_exporter/utils.py | 6 +++++- 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/prometheus_es_exporter/__init__.py b/prometheus_es_exporter/__init__.py index d89e246..5b5f517 100644 --- a/prometheus_es_exporter/__init__.py +++ b/prometheus_es_exporter/__init__.py @@ -7,6 +7,8 @@ import os import sched import time +import concurrent.futures +import threading from elasticsearch import Elasticsearch from elasticsearch.exceptions import ConnectionTimeout @@ -32,7 +34,7 @@ } METRICS_BY_QUERY = {} - +METRICS_BY_QUERY_LOCK = threading.Lock() def collector_up_gauge(name_list, description, succeeded=True): metric_name = format_metric_name(*name_list, 'up') @@ -194,7 +196,9 @@ def collect(self): # as it may be updated by other threads. # (only first level - lower levels are replaced # wholesale, so don't worry about them) + METRICS_BY_QUERY_LOCK.acquire() query_metrics = METRICS_BY_QUERY.copy() + METRICS_BY_QUERY_LOCK.release() for metric_dict in query_metrics.values(): yield from gauge_generator(metric_dict) @@ -214,6 +218,7 @@ def run_query(es_client, query_name, indices, query, # If this query has successfully run before, we need to handle any # metrics produced by that previous run. + METRICS_BY_QUERY_LOCK.acquire() if query_name in METRICS_BY_QUERY: old_metric_dict = METRICS_BY_QUERY[query_name] @@ -231,10 +236,12 @@ def run_query(es_client, query_name, indices, query, zero_missing=True) METRICS_BY_QUERY[query_name] = metric_dict + METRICS_BY_QUERY_LOCK.release() else: # If this query has successfully run before, we need to handle any # missing metrics. + METRICS_BY_QUERY_LOCK.acquire() if query_name in METRICS_BY_QUERY: old_metric_dict = METRICS_BY_QUERY[query_name] @@ -250,6 +257,7 @@ def run_query(es_client, query_name, indices, query, zero_missing=True) METRICS_BY_QUERY[query_name] = metric_dict + METRICS_BY_QUERY_LOCK.release() # Based on click.Choice @@ -428,6 +436,8 @@ def conv(value): 'in filename order. ' 'Can be absolute, or relative to the current working directory. ' '(default: ./config)') +@click.option('--threads', + help='Enables concurrent query execution using the number of threads specified. If the number of threads is less than 1, throws an error.') @click.option('--cluster-health-disable', default=False, is_flag=True, help='Disable cluster health monitoring.') @click.option('--cluster-health-timeout', default=10.0, @@ -509,6 +519,14 @@ def cli(**options): '--indices-stats-mode must be "indices" for ' '--indices-stats-indices to be used.') + executor = None + if options['threads']: + num_threads = int(options['threads']) + if num_threads < 1: + raise click.BadOptionUsage('threads', + '--threads must specify a number of threads greater than 0.') + executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) + log_handler = logging.StreamHandler() log_format = '[%(asctime)s] %(name)s.%(levelname)s %(threadName)s %(message)s' formatter = JogFormatter(log_format) if options['json_logging'] else logging.Formatter(log_format) @@ -571,7 +589,7 @@ def cli(**options): if queries: for query_name, (interval, timeout, indices, query, on_error, on_missing) in queries.items(): - schedule_job(scheduler, interval, + schedule_job(scheduler, executor, interval, run_query, es_client, query_name, indices, query, timeout, on_error, on_missing) else: diff --git a/prometheus_es_exporter/scheduler.py b/prometheus_es_exporter/scheduler.py index 3f2609f..29ca9d4 100644 --- a/prometheus_es_exporter/scheduler.py +++ b/prometheus_es_exporter/scheduler.py @@ -3,8 +3,7 @@ log = logging.getLogger(__name__) - -def schedule_job(scheduler, interval, func, *args, **kwargs): +def schedule_job(scheduler, executor, interval, func, *args, **kwargs): """ Schedule a function to be run on a fixed interval. @@ -12,10 +11,16 @@ def schedule_job(scheduler, interval, func, *args, **kwargs): """ def scheduled_run(scheduled_time, *args, **kwargs): - try: - func(*args, **kwargs) - except Exception: - log.exception('Error while running scheduled job.') + def run_func(func, *args, **kwargs): + try: + func(*args, **kwargs) + except Exception: + log.exception('Error while running scheduled job.') + + if executor != None: + executor.submit(lambda *args, **kwargs: run_func(func, *args, **kwargs), *args, **kwargs) + else: + run_func(func, *args, **kwargs) current_time = time.monotonic() next_scheduled_time = scheduled_time + interval diff --git a/prometheus_es_exporter/utils.py b/prometheus_es_exporter/utils.py index f1dd81a..66506aa 100644 --- a/prometheus_es_exporter/utils.py +++ b/prometheus_es_exporter/utils.py @@ -52,7 +52,8 @@ def wrapper(*args, **kwargs): return decorator -def nice_shutdown(shutdown_signals=(signal.SIGINT, signal.SIGTERM)): +def nice_shutdown(executor=None, + shutdown_signals=(signal.SIGINT, signal.SIGTERM)): """ Logs shutdown signals nicely. @@ -64,6 +65,9 @@ def nice_shutdown(shutdown_signals=(signal.SIGINT, signal.SIGTERM)): def sig_handler(signum, _): log.info('Received signal %(signal)s.', {'signal': signal.Signals(signum).name}) + # Finish ThreadPoolExecutor execution cleanly, if using threading. + if executor != None: + executor.shutdown() # Raise SystemExit to bypass (most) try/except blocks. sys.exit() From e02a3bef3f6b510f06cbe56fbf259c7b3c4000fe Mon Sep 17 00:00:00 2001 From: Hemanth Kini Date: Thu, 24 Sep 2020 23:15:06 -0700 Subject: [PATCH 3/7] Update prometheus_es_exporter/__init__.py Co-authored-by: Braedon Vickers --- prometheus_es_exporter/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/prometheus_es_exporter/__init__.py b/prometheus_es_exporter/__init__.py index 5b5f517..d7f75d8 100644 --- a/prometheus_es_exporter/__init__.py +++ b/prometheus_es_exporter/__init__.py @@ -436,8 +436,9 @@ def conv(value): 'in filename order. ' 'Can be absolute, or relative to the current working directory. ' '(default: ./config)') -@click.option('--threads', - help='Enables concurrent query execution using the number of threads specified. If the number of threads is less than 1, throws an error.') +@click.option('--threads', type=click.IntRange(min=1), default=1, + help='Enables concurrent query execution using the number of threads specified. ' + '(default: 1)) @click.option('--cluster-health-disable', default=False, is_flag=True, help='Disable cluster health monitoring.') @click.option('--cluster-health-timeout', default=10.0, From aac417161cbec1225b9d87dc10b07b0f0410c459 Mon Sep 17 00:00:00 2001 From: Hemanth Kini Date: Thu, 24 Sep 2020 23:16:35 -0700 Subject: [PATCH 4/7] Update prometheus_es_exporter/scheduler.py Co-authored-by: Braedon Vickers --- prometheus_es_exporter/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/prometheus_es_exporter/scheduler.py b/prometheus_es_exporter/scheduler.py index 3dfef08..c9450ce 100644 --- a/prometheus_es_exporter/scheduler.py +++ b/prometheus_es_exporter/scheduler.py @@ -18,8 +18,8 @@ def run_func(func, *args, **kwargs): except Exception: log.exception('Error while running scheduled job.') - if executor != None: - executor.submit(lambda *args, **kwargs: run_func(func, *args, **kwargs), *args, **kwargs) + if executor is not None: + executor.submit(run_func, func, *args, **kwargs) else: run_func(func, *args, **kwargs) From a86953eb543e573721c5fb24e7e7fb8e32a17db2 Mon Sep 17 00:00:00 2001 From: Hemanth Kini Date: Thu, 24 Sep 2020 23:22:56 -0700 Subject: [PATCH 5/7] Update prometheus_es_exporter/__init__.py Co-authored-by: Braedon Vickers --- prometheus_es_exporter/__init__.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/prometheus_es_exporter/__init__.py b/prometheus_es_exporter/__init__.py index d7f75d8..8099f3d 100644 --- a/prometheus_es_exporter/__init__.py +++ b/prometheus_es_exporter/__init__.py @@ -521,11 +521,8 @@ def cli(**options): '--indices-stats-indices to be used.') executor = None - if options['threads']: - num_threads = int(options['threads']) - if num_threads < 1: - raise click.BadOptionUsage('threads', - '--threads must specify a number of threads greater than 0.') + num_threads = options['threads'] + if num_threads > 1: executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) log_handler = logging.StreamHandler() From 4951fbff60afff353a2705785aef5f7abc7e1a2a Mon Sep 17 00:00:00 2001 From: Braedon Vickers Date: Fri, 25 Sep 2020 14:54:55 +0800 Subject: [PATCH 6/7] Update prometheus_es_exporter/__init__.py --- prometheus_es_exporter/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus_es_exporter/__init__.py b/prometheus_es_exporter/__init__.py index 8099f3d..86678e6 100644 --- a/prometheus_es_exporter/__init__.py +++ b/prometheus_es_exporter/__init__.py @@ -438,7 +438,7 @@ def conv(value): '(default: ./config)') @click.option('--threads', type=click.IntRange(min=1), default=1, help='Enables concurrent query execution using the number of threads specified. ' - '(default: 1)) + '(default: 1)') @click.option('--cluster-health-disable', default=False, is_flag=True, help='Disable cluster health monitoring.') @click.option('--cluster-health-timeout', default=10.0, From 8208ec986beee13be3f64b766e87fef6d872b523 Mon Sep 17 00:00:00 2001 From: Hemanth Kini Date: Sat, 26 Sep 2020 10:25:23 -0700 Subject: [PATCH 7/7] Cleanup changes requested by maintainer The maintainer requested that I drop locking and remove unused imports, so this commit implements that. --- prometheus_es_exporter/__init__.py | 7 ------- prometheus_es_exporter/scheduler.py | 1 - prometheus_es_exporter/utils.py | 7 ++----- 3 files changed, 2 insertions(+), 13 deletions(-) diff --git a/prometheus_es_exporter/__init__.py b/prometheus_es_exporter/__init__.py index 86678e6..4122cf9 100644 --- a/prometheus_es_exporter/__init__.py +++ b/prometheus_es_exporter/__init__.py @@ -34,7 +34,6 @@ } METRICS_BY_QUERY = {} -METRICS_BY_QUERY_LOCK = threading.Lock() def collector_up_gauge(name_list, description, succeeded=True): metric_name = format_metric_name(*name_list, 'up') @@ -196,9 +195,7 @@ def collect(self): # as it may be updated by other threads. # (only first level - lower levels are replaced # wholesale, so don't worry about them) - METRICS_BY_QUERY_LOCK.acquire() query_metrics = METRICS_BY_QUERY.copy() - METRICS_BY_QUERY_LOCK.release() for metric_dict in query_metrics.values(): yield from gauge_generator(metric_dict) @@ -218,7 +215,6 @@ def run_query(es_client, query_name, indices, query, # If this query has successfully run before, we need to handle any # metrics produced by that previous run. - METRICS_BY_QUERY_LOCK.acquire() if query_name in METRICS_BY_QUERY: old_metric_dict = METRICS_BY_QUERY[query_name] @@ -236,12 +232,10 @@ def run_query(es_client, query_name, indices, query, zero_missing=True) METRICS_BY_QUERY[query_name] = metric_dict - METRICS_BY_QUERY_LOCK.release() else: # If this query has successfully run before, we need to handle any # missing metrics. - METRICS_BY_QUERY_LOCK.acquire() if query_name in METRICS_BY_QUERY: old_metric_dict = METRICS_BY_QUERY[query_name] @@ -257,7 +251,6 @@ def run_query(es_client, query_name, indices, query, zero_missing=True) METRICS_BY_QUERY[query_name] = metric_dict - METRICS_BY_QUERY_LOCK.release() # Based on click.Choice diff --git a/prometheus_es_exporter/scheduler.py b/prometheus_es_exporter/scheduler.py index c9450ce..e62a295 100644 --- a/prometheus_es_exporter/scheduler.py +++ b/prometheus_es_exporter/scheduler.py @@ -1,6 +1,5 @@ import time import logging -import concurrent.futures log = logging.getLogger(__name__) diff --git a/prometheus_es_exporter/utils.py b/prometheus_es_exporter/utils.py index 66506aa..bc8407c 100644 --- a/prometheus_es_exporter/utils.py +++ b/prometheus_es_exporter/utils.py @@ -52,8 +52,7 @@ def wrapper(*args, **kwargs): return decorator -def nice_shutdown(executor=None, - shutdown_signals=(signal.SIGINT, signal.SIGTERM)): +def nice_shutdown(shutdown_signals=(signal.SIGINT, signal.SIGTERM)): """ Logs shutdown signals nicely. @@ -65,9 +64,7 @@ def nice_shutdown(executor=None, def sig_handler(signum, _): log.info('Received signal %(signal)s.', {'signal': signal.Signals(signum).name}) - # Finish ThreadPoolExecutor execution cleanly, if using threading. - if executor != None: - executor.shutdown() + # Raise SystemExit to bypass (most) try/except blocks. sys.exit()