Skip to content

Commit

Permalink
Merge pull request #81 from hemanthkini/master
Browse files Browse the repository at this point in the history
Add concurrent query execution support
  • Loading branch information
braedon committed Sep 27, 2020
2 parents 6e2e492 + 8208ec9 commit d3c865c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
11 changes: 10 additions & 1 deletion prometheus_es_exporter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import click
import click_config_file
import concurrent.futures
import configparser
import glob
import json
Expand Down Expand Up @@ -428,6 +429,9 @@ def conv(value):
'in filename order. '
'Can be absolute, or relative to the current working directory. '
'(default: ./config)')
@click.option('--threads', type=click.IntRange(min=1), default=1,
help='Enables concurrent query execution using the number of threads specified. '
'(default: 1)')
@click.option('--cluster-health-disable', default=False, is_flag=True,
help='Disable cluster health monitoring.')
@click.option('--cluster-health-timeout', default=10.0,
Expand Down Expand Up @@ -509,6 +513,11 @@ def cli(**options):
'--indices-stats-mode must be "indices" for '
'--indices-stats-indices to be used.')

executor = None
num_threads = options['threads']
if num_threads > 1:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads)

log_handler = logging.StreamHandler()
log_format = '[%(asctime)s] %(name)s.%(levelname)s %(threadName)s %(message)s'
formatter = JogFormatter(log_format) if options['json_logging'] else logging.Formatter(log_format)
Expand Down Expand Up @@ -571,7 +580,7 @@ def cli(**options):
if queries:
for query_name, (interval, timeout, indices, query,
on_error, on_missing) in queries.items():
schedule_job(scheduler, interval,
schedule_job(scheduler, executor, interval,
run_query, es_client, query_name, indices, query,
timeout, on_error, on_missing)
else:
Expand Down
16 changes: 11 additions & 5 deletions prometheus_es_exporter/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@
log = logging.getLogger(__name__)


def schedule_job(scheduler, interval, func, *args, **kwargs):
def schedule_job(scheduler, executor, interval, func, *args, **kwargs):
"""
Schedule a function to be run on a fixed interval.
Works with schedulers from the stdlib sched module.
"""

def scheduled_run(scheduled_time, *args, **kwargs):
try:
func(*args, **kwargs)
except Exception:
log.exception('Error while running scheduled job.')
def run_func(func, *args, **kwargs):
try:
func(*args, **kwargs)
except Exception:
log.exception('Error while running scheduled job.')

if executor is not None:
executor.submit(run_func, func, *args, **kwargs)
else:
run_func(func, *args, **kwargs)

current_time = time.monotonic()
next_scheduled_time = scheduled_time + interval
Expand Down

0 comments on commit d3c865c

Please sign in to comment.