Skip to content

Commit

Permalink
Merge branch 'master' into 8.x
Browse files Browse the repository at this point in the history
  • Loading branch information
untergeek committed Jul 13, 2023
2 parents 63cb941 + 3e34412 commit a6f73db
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 48 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Dockerfile
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# syntax=docker/dockerfile:1
ARG PYVER=3.11.1
ARG PYVER=3.11.4
ARG ALPTAG=3.17
FROM python:${PYVER}-alpine${ALPTAG} as builder

Expand Down
2 changes: 1 addition & 1 deletion curator/_version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""Curator Version"""
__version__ = '8.0.4'
__version__ = '8.0.5'
36 changes: 32 additions & 4 deletions curator/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Main CLI for Curator"""
import sys
import logging
import pathlib
import click
from es_client.builder import ClientArgs, OtherArgs
from es_client.helpers.utils import get_yaml, check_config, prune_nones, verify_url_schema
Expand All @@ -14,6 +15,30 @@
from curator.cli_singletons.utils import get_width
from curator._version import __version__

def configfile_callback(ctx, param, value):
"""Callback to validate whether the provided config file exists and is writeable
:param ctx: The click context
:param param: The click parameter object
:param value: The value of the parameter
:type ctx: Click context
:type param: Click object
:type value: Any
:returns: Config file path or None
:rtype: str
"""
logger = logging.getLogger(__name__)
logger.debug('Click ctx = %s', ctx)
logger.debug('Click param = %s', param)
logger.debug('Click value = %s', value)
path = pathlib.Path(value)
if path.is_file():
return value
logger.warning('Config file not found: %s', value)
return None

def override_logging(config, loglevel, logfile, logformat):
"""Get logging config and override from command-line options
Expand Down Expand Up @@ -238,7 +263,7 @@ def run(client_args, other_args, action_file, dry_run=False):

# pylint: disable=unused-argument, redefined-builtin
@click.command(context_settings=get_width())
@click.option('--config', help='Path to configuration file.', type=click.Path(exists=True), default=settings.config_file())
@click.option('--config', help='Path to configuration file.', type=str, default=settings.config_file(), callback=configfile_callback)
@click.option('--hosts', help='Elasticsearch URL to connect to', multiple=True)
@click.option('--cloud_id', help='Shorthand to connect to Elastic Cloud instance')
@click.option('--api_token', help='The base64 encoded API Key token', type=str)
Expand Down Expand Up @@ -281,9 +306,12 @@ def cli(
other_args = OtherArgs()
if config:
from_yaml = get_yaml(config)
raw_config = check_config(from_yaml)
client_args.update_settings(raw_config['client'])
other_args.update_settings(raw_config['other_settings'])
else:
# Use empty defaults.
from_yaml = {'elasticsearch': {'client': {}, 'other_settings': {}}, 'logging': {}}
raw_config = check_config(from_yaml)
client_args.update_settings(raw_config['client'])
other_args.update_settings(raw_config['other_settings'])

set_logging(check_logging_config(
{'logging': override_logging(from_yaml, loglevel, logfile, logformat)}))
Expand Down
18 changes: 9 additions & 9 deletions curator/helpers/waiters.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

def health_check(client, **kwargs):
"""
This function calls `client.cluster.` :py:meth:`~.elasticsearch.client.ClusterClient.health` and, based on the
params provided, will return ``True`` or ``False`` depending on whether that particular keyword
appears in the output, and has the expected value.
This function calls `client.cluster.` :py:meth:`~.elasticsearch.client.ClusterClient.health`
and, based on the params provided, will return ``True`` or ``False`` depending on whether that
particular keyword appears in the output, and has the expected value.
If multiple keys are provided, all must match for a ``True`` response.
Expand Down Expand Up @@ -120,11 +120,11 @@ def restore_check(client, index_list):

def snapshot_check(client, snapshot=None, repository=None):
"""
This function calls `client.snapshot.` :py:meth:`~.elasticsearch.client.SnapshotClient.get` and tests to see
whether the snapshot is complete, and if so, with what status. It will log errors according
to the result. If the snapshot is still ``IN_PROGRESS``, it will return ``False``. ``SUCCESS``
will be an ``INFO`` level message, ``PARTIAL`` nets a ``WARNING`` message, ``FAILED`` is an
``ERROR``, message, and all others will be a ``WARNING`` level message.
This function calls `client.snapshot.` :py:meth:`~.elasticsearch.client.SnapshotClient.get` and
tests to see whether the snapshot is complete, and if so, with what status. It will log errors
according to the result. If the snapshot is still ``IN_PROGRESS``, it will return ``False``.
``SUCCESS`` will be an ``INFO`` level message, ``PARTIAL`` nets a ``WARNING`` message,
``FAILED`` is an ``ERROR``, message, and all others will be a ``WARNING`` level message.
:param client: A client connection object
:param snapshot: The snapshot name
Expand Down Expand Up @@ -200,7 +200,7 @@ def task_check(client, task_id=None):
descr = task['description']

if completed:
completion_time = ((running_time * 1000) + task['start_time_in_millis'])
completion_time = (running_time * 1000) + task['start_time_in_millis']
time_string = strftime('%Y-%m-%dT%H:%M:%SZ', localtime(completion_time/1000))
logger.info('Task "%s" completed at %s.', descr, time_string)
retval = True
Expand Down
58 changes: 43 additions & 15 deletions curator/indexlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ def __get_indices(self):
if self.indices:
for index in self.indices:
self.__build_index_info(index)
self._get_metadata()
self._get_index_stats()
self.get_metadata()
self.get_index_stats()

def __build_index_info(self, index):
"""
Expand Down Expand Up @@ -113,7 +113,17 @@ def __map_method(self, ftype):
}
return methods[ftype]

def _get_index_stats(self):
def __remove_missing(self, err):
"""
Remove missing index found in ``err`` from self.indices and return that name
"""
missing = err.info['error']['index']
self.loggit.warning('Index was initiallly present, but now is not: %s', missing)
self.loggit.debug('Removing %s from active IndexList')
self.indices.remove(missing)
return missing

def get_index_stats(self):
"""
Populate ``index_info`` with index ``size_in_bytes``, ``primary_size_in_bytes`` and doc
count information for each index.
Expand Down Expand Up @@ -143,14 +153,28 @@ def iterate_over_stats(stats):
index_lists = chunk_index_list(working_list)
for lst in index_lists:
stats_result = {}
try:
stats_result.update(self._get_indices_stats(lst))
except TransportError as err:
if '413' in err.errors:
self.loggit.debug('Huge Payload 413 Error - Trying to get information with multiple requests')
stats_result = {}
stats_result.update(self._bulk_queries(lst, self._get_indices_stats))
iterate_over_stats(stats_result)
checking = True
while checking:
try:
stats_result.update(self._get_indices_stats(lst))
except NotFoundError as err:
lst.remove(self.__remove_missing(err))
continue
except TransportError as err:
if '413' in err.errors:
msg = (
'Huge Payload 413 Err - '
'Trying to get information via multiple requests'
)
self.loggit.debug(msg)
stats_result = {}
try:
stats_result.update(self._bulk_queries(lst, self._get_indices_stats))
except NotFoundError as err2:
lst.remove(self.__remove_missing(err2))
continue
iterate_over_stats(stats_result)
checking = False

def _get_indices_stats(self, data):
return self.client.indices.stats(index=to_csv(data), metric='store,docs')
Expand All @@ -171,7 +195,7 @@ def _bulk_queries(self, data, exec_func):
def _get_cluster_state(self, data):
return self.client.cluster.state(index=to_csv(data), metric='metadata')['metadata']['indices']

def _get_metadata(self):
def get_metadata(self):
"""
Populate ``index_info`` with index ``size_in_bytes`` and doc count information for each
index.
Expand All @@ -180,11 +204,15 @@ def _get_metadata(self):
self.empty_list_check()
for lst in chunk_index_list(self.indices):
working_list = {}
# The API called by _get_cluster_state doesn't suffer from the same problems that
# _get_indices_stats does. This won't result in an error if an index is suddenly
# missing.
try:
working_list.update(self._get_cluster_state(lst))
except TransportError as err:
if '413' in err.errors:
self.loggit.debug('Huge Payload 413 Error - Trying to get information with multiple requests')
msg = 'Huge Payload 413 Err - Trying to get information via multiple requests'
self.loggit.debug(msg)
working_list = {}
working_list.update(self._bulk_queries(lst, self._get_cluster_state))
if working_list:
Expand All @@ -194,8 +222,8 @@ def _get_metadata(self):
sii['age']['creation_date'] = (
fix_epoch(wli['settings']['index']['creation_date'])
)
sii['number_of_replicas'] = (wli['settings']['index']['number_of_replicas'])
sii['number_of_shards'] = (wli['settings']['index']['number_of_shards'])
sii['number_of_replicas'] = wli['settings']['index']['number_of_replicas']
sii['number_of_shards'] = wli['settings']['index']['number_of_shards']
sii['state'] = wli['state']
if 'routing' in wli['settings']['index']:
sii['routing'] = wli['settings']['index']['routing']
Expand Down
11 changes: 10 additions & 1 deletion curator/logtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import time
from pathlib import Path
import ecs_logging
from curator.exceptions import LoggingException

Expand Down Expand Up @@ -54,6 +55,11 @@ def deepmerge(source, destination):
destination[key] = value
return destination

def is_docker():
"""Check if we're running in a docker container"""
cgroup = Path('/proc/self/cgroup')
return Path('/.dockerenv').is_file() or cgroup.is_file() and 'docker' in cgroup.read_text()

class LogstashFormatter(logging.Formatter):
"""Logstash formatting (JSON)"""
# The LogRecord attributes we want to carry over to the Logstash message,
Expand Down Expand Up @@ -126,7 +132,10 @@ def __init__(self, cfg):
raise ValueError(f"Invalid log level: {cfg['loglevel']}")

#: Attribute. Which logging handler to use
self.handler = logging.StreamHandler(stream=sys.stdout)
if is_docker():
self.handler = logging.FileHandler('/proc/1/fd/1')
else:
self.handler = logging.StreamHandler(stream=sys.stdout)
if cfg['logfile']:
self.handler = logging.FileHandler(cfg['logfile'])

Expand Down
24 changes: 24 additions & 0 deletions docs/Changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,30 @@
Changelog
=========

8.0.5 (13 July 2023)
--------------------

**Announcements**

Release for Elasticsearch 8.8.2

**Changes**

* Small PEP formatting changes that were found editing code.
* Bump Python version in Dockerfile to 3.11.4
* Bump Python dependency versions.
* Change ``targetName`` to ``target_name`` in ``setup.py`` for newest version
of cx_Freeze. Hat tip to ``@rene-dekker`` in #1681 who made these changes
to 5.x and 7.x.
* Fix command-line behavior to not fail if the default config file is not
present. The newer CLI-based configuration should allow for no config file
at all, and now that's fixed.
* Initial work done to prevent a race condition where an index is present at IndexList
initialization, but is missing by the time index stats collection begins. The resultant
404s were causing Curator to shut down and not complete steps.
* When running in a Docker container, make Curator log to ``/proc/1/fd/1`` by
default, if no value is provided for ``logfile`` (otherwise, use that).

8.0.4 (28 April 2023)
---------------------

Expand Down
10 changes: 5 additions & 5 deletions docs/asciidoc/index.asciidoc
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
:curator_version: 8.0.4
:curator_version: 8.0.5
:curator_major: 8
:curator_doc_tree: 8.0
:es_py_version: 8.7.0
:es_doc_tree: 8.7
:stack_doc_tree: 8.7
:pybuild_ver: 3.11.3
:es_py_version: 8.8.2
:es_doc_tree: 8.8
:stack_doc_tree: 8.8
:pybuild_ver: 3.11.4
:copyright_years: 2011-2023
:ref: http://www.elastic.co/guide/en/elasticsearch/reference/{es_doc_tree}
:esref: http://www.elastic.co/guide/en/elasticsearch/reference/{stack_doc_tree}
Expand Down
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@

intersphinx_mapping = {
'python': ('https://docs.python.org/3.11', None),
'es_client': ('https://es-client.readthedocs.io/en/v8.7.0', None),
'elasticsearch8': ('https://elasticsearch-py.readthedocs.io/en/v8.7.0', None),
'es_client': ('https://es-client.readthedocs.io/en/v8.8.2', None),
'elasticsearch8': ('https://elasticsearch-py.readthedocs.io/en/v8.8.2', None),
'voluptuous': ('http://alecthomas.github.io/voluptuous/docs/_build/html', None),
'click': ('https://click.palletsprojects.com/en/8.1.x', None),
}
12 changes: 6 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ keywords = [
'index-expiry'
]
dependencies = [
"elasticsearch8==8.7.0",
"es_client==8.7.0",
"ecs-logging==2.0.0",
"click==8.1.3",
"elasticsearch8==8.8.2",
"es_client==8.8.2",
"ecs-logging==2.0.2",
"click==8.1.4",
"pyyaml==6.0.0",
"voluptuous>=0.13.1",
"certifi>=2022.12.7",
"certifi>=2023.5.7",
"six>=1.16.0",
]

Expand Down Expand Up @@ -103,7 +103,7 @@ run = "run-coverage --no-cov"

[[tool.hatch.envs.test.matrix]]
python = ["3.9", "3.10", "3.11"]
version = ["8.0.4"]
version = ["8.0.5"]

[tool.pytest.ini_options]
pythonpath = [".", "curator"]
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

setup(
executables=[
Executable("run_curator.py", base=base, targetName="curator"),
Executable("run_singleton.py", base=base, targetName="curator_cli"),
Executable("run_es_repo_mgr.py", base=base, targetName="es_repo_mgr"),
Executable("run_curator.py", base=base, target_name="curator"),
Executable("run_singleton.py", base=base, target_name="curator_cli"),
Executable("run_es_repo_mgr.py", base=base, target_name="es_repo_mgr"),
]
)
2 changes: 1 addition & 1 deletion tests/integration/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def test_cli_client_config(self):
self.create_indices(10)
self.write_config(self.args['configfile'], testvars.bad_client_config.format(HOST))
self.write_config(self.args['actionfile'], testvars.disabled_proto.format('close', 'delete_indices'))
self.invoke_runner_alt(hosts='http://127.0.0.1:9200', loglevel='DEBUG', logformat='ecs', logfile=self.args['configfile'])
self.invoke_runner_alt(hosts='http://127.0.0.1:9200', loglevel='DEBUG', logformat='ecs')
assert 0 == self.result.exit_code
def test_cli_unreachable_cloud_id(self):
self.create_indices(10)
Expand Down
28 changes: 28 additions & 0 deletions tests/integration/test_integrations.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""Test integrations"""
# pylint: disable=missing-function-docstring, missing-class-docstring, line-too-long
import os
import logging
import pytest
from curator.exceptions import ConfigurationError
from curator.helpers.getters import get_indices
from curator import IndexList
from . import CuratorTestCase
from . import testvars

Expand Down Expand Up @@ -69,3 +72,28 @@ def test_field_stats_skips_empty_index(self):
self.invoke_runner()
# It should skip deleting 'zero', as it has 0 docs
assert [zero] == get_indices(self.client)

class TestIndexList(CuratorTestCase):
"""Test some of the IndexList particulars using a live ES instance/cluster"""
IDX1 = 'dummy1'
IDX2 = 'dummy2'
IDX3 = 'my_index'

@pytest.fixture(autouse=True)
def inject_fixtures(self, caplog):
# pylint: disable=attribute-defined-outside-init
self._caplog = caplog
def test_get_index_stats_with_404(self):
"""Check to ensure that index_stats are being collected if one index is missing"""
expected = f'Index was initiallly present, but now is not: {self.IDX2}'
self.create_index(self.IDX1)
self.create_index(self.IDX2)
self.create_index(self.IDX3)
ilo = IndexList(self.client)
assert ilo.indices == [self.IDX1, self.IDX2, self.IDX3]
self.client.indices.delete(index=f'{self.IDX1},{self.IDX2}')
with self._caplog.at_level(logging.WARNING):
ilo.get_index_stats()
# Guarantee we're getting the expected WARNING level message
assert self._caplog.records[-1].message == expected
assert ilo.indices == [self.IDX3]

0 comments on commit a6f73db

Please sign in to comment.