diff --git a/prometheus_es_exporter/__init__.py b/prometheus_es_exporter/__init__.py index d89e246..ab5bca7 100644 --- a/prometheus_es_exporter/__init__.py +++ b/prometheus_es_exporter/__init__.py @@ -1,5 +1,6 @@ import click import click_config_file +import concurrent.futures import configparser import glob import json @@ -428,6 +429,9 @@ def conv(value): 'in filename order. ' 'Can be absolute, or relative to the current working directory. ' '(default: ./config)') +@click.option('--threads', type=click.IntRange(min=1), default=1, + help='Enables concurrent query execution using the number of threads specified. ' + '(default: 1)') @click.option('--cluster-health-disable', default=False, is_flag=True, help='Disable cluster health monitoring.') @click.option('--cluster-health-timeout', default=10.0, @@ -509,6 +513,11 @@ def cli(**options): '--indices-stats-mode must be "indices" for ' '--indices-stats-indices to be used.') + executor = None + num_threads = options['threads'] + if num_threads > 1: + executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) + log_handler = logging.StreamHandler() log_format = '[%(asctime)s] %(name)s.%(levelname)s %(threadName)s %(message)s' formatter = JogFormatter(log_format) if options['json_logging'] else logging.Formatter(log_format) @@ -571,7 +580,7 @@ def cli(**options): if queries: for query_name, (interval, timeout, indices, query, on_error, on_missing) in queries.items(): - schedule_job(scheduler, interval, + schedule_job(scheduler, executor, interval, run_query, es_client, query_name, indices, query, timeout, on_error, on_missing) else: diff --git a/prometheus_es_exporter/scheduler.py b/prometheus_es_exporter/scheduler.py index 3f2609f..9a62db1 100644 --- a/prometheus_es_exporter/scheduler.py +++ b/prometheus_es_exporter/scheduler.py @@ -4,7 +4,7 @@ log = logging.getLogger(__name__) -def schedule_job(scheduler, interval, func, *args, **kwargs): +def schedule_job(scheduler, executor, interval, func, *args, **kwargs): """ Schedule a function to be run on a fixed interval. @@ -12,10 +12,16 @@ def schedule_job(scheduler, interval, func, *args, **kwargs): """ def scheduled_run(scheduled_time, *args, **kwargs): - try: - func(*args, **kwargs) - except Exception: - log.exception('Error while running scheduled job.') + def run_func(func, *args, **kwargs): + try: + func(*args, **kwargs) + except Exception: + log.exception('Error while running scheduled job.') + + if executor is not None: + executor.submit(run_func, func, *args, **kwargs) + else: + run_func(func, *args, **kwargs) current_time = time.monotonic() next_scheduled_time = scheduled_time + interval