Skip to content

Commit

Permalink
Merge pull request #190 from pnuu/bugfix-s3filesystem-init
Browse files Browse the repository at this point in the history
Fix connection parameter handling
  • Loading branch information
mraspaud authored Feb 27, 2024
2 parents 010c0d4 + e158f3a commit 22e2dc8
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 9 deletions.
6 changes: 4 additions & 2 deletions examples/move_it_server.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
# The values defined in individual sections will override these settings.

# Path to SSH _private_ key for key-based identification for ssh transfers
ssh_key_filename = /home/username/.ssh/id_rsa
# Put this in connection_parameters dictionary by adding the prefix
connection_parameters__ssh_key_filename = /home/username/.ssh/id_rsa

# Set watchdog polling timeout (interval) in seconds.
# Only effective if "-w" commandline argument is given
Expand Down Expand Up @@ -34,7 +35,8 @@ topic = /1b/hrit-segment/0deg
# Do not delete the compressed file
delete = False
# Path to SSH key _private_ key used for transfers
# ssh_key_filename = /home/user/.ssh/id_rsa
# Put this in connection_parameters dictionary by adding the prefix
# connection_parameters__ssh_key_filename = /home/user/.ssh/id_rsa


[aapp-data-ears-pps-process]
Expand Down
21 changes: 21 additions & 0 deletions trollmoves/movers.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,27 @@ class S3Mover(Mover):
changing the filename. The new destination filename will be the last part
of the provided destination following the last slash ('/').
In the Trollmoves Server config, which is in .ini format, the connection parameters
and other dictionary-like items can be defined with douple underscore format::
connection_parameters__secret = secret
connection_parameters__client_kwargs__endpoint_url = https://endpoint.url
connection_parameters__client_kwargs__verify = false
will result in a nested dictionary item::
{
'connection_parameters': {
'secret': 'secret',
'client_kwargs': {
'endpoint_url': 'https://endpoint.url',
'verify': False
}
}
}
Note that boolean values are converted. Numeric values are handled where they are used.
"""

def copy(self):
Expand Down
51 changes: 50 additions & 1 deletion trollmoves/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import subprocess
import tempfile
import time
import warnings
from collections import deque
from configparser import ConfigParser
from contextlib import suppress
Expand Down Expand Up @@ -65,6 +66,8 @@
file_cache_lock = Lock()
START_TIME = datetime.datetime.utcnow()

CONNECTION_CONFIG_ITEMS = ["connection_uptime", "ssh_key_filename", "ssh_connection_timeout", "ssh_private_key_file"]


class RequestManager(Thread):
"""Manage requests."""
Expand Down Expand Up @@ -166,7 +169,8 @@ def _move_file(self, pathname, message, rel_path):
return_message = None
try:
destination = move_it(pathname, message.data['destination'],
self._attrs, rel_path=rel_path,
self._attrs["connection_parameters"],
rel_path=rel_path,
backup_targets=message.data.get('backup_targets', None))
message.data['destination'] = destination
except Exception as err:
Expand Down Expand Up @@ -568,6 +572,8 @@ def _read_ini_config(filename):
_parse_nameserver(res[section], cp_[section])
_parse_addresses(res[section])
_parse_delete(res[section], cp_[section])
res[section] = _create_config_sub_dicts(res[section])
res[section] = _form_connection_parameters_dict(res[section])
if not _check_origin_and_listen(res, section):
continue
if not _check_topic(res, section):
Expand Down Expand Up @@ -606,6 +612,49 @@ def _parse_delete(conf, raw_conf):
conf["delete"] = val


def _create_config_sub_dicts(original):
# Take a copy so we can modify the values if necessary
res = dict(original.items())
for key in original.keys():
parts = key.split("__")
if len(parts) > 1:
_create_dicts(res, parts, original[key])
del res[key]
return res


def _create_dicts(res, parts, val):
cur = res
for part in parts[:-1]:
if part not in cur:
cur[part] = {}
cur = cur[part]
cur[parts[-1]] = _check_bool(val)


def _check_bool(val):
if val.lower() in ["0", "false"]:
return False
elif val.lower() in ["1", "true"]:
return True
return val


def _form_connection_parameters_dict(original):
# Take a copy so we can modify the values if necessary
res = dict(original.items())
if "connection_parameters" not in res:
res["connection_parameters"] = {}
for key in original.keys():
if key in CONNECTION_CONFIG_ITEMS:
warnings.warn(
f"Consider using connection_parameters__{key} instead of {key}.",
category=UserWarning)
res["connection_parameters"][key] = original[key]
del res[key]
return res


def _check_origin_and_listen(res, section):
if ("origin" not in res[section]) and ('listen' not in res[section]):
LOGGER.warning("Incomplete section %s: add an 'origin' or 'listen' item.", section)
Expand Down
4 changes: 2 additions & 2 deletions trollmoves/tests/test_movers.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ def test_s3_copy_file_to_base_using_connection_parameters(S3FileSystem):
attrs = config['target-s3-example1']['connection_parameters']

s3_mover = _get_s3_mover(ORIGIN, "s3://data-bucket/", **attrs)
assert s3_mover.attrs['client_kwargs'] == {'endpoint_url': 'https://minio-server.mydomain.se:9000',
'verify': False}
assert s3_mover.attrs['client_kwargs'] == {
'endpoint_url': 'https://minio-server.mydomain.se:9000', 'verify': False}
assert s3_mover.attrs['secret'] == 'my-super-secret-key'
assert s3_mover.attrs['key'] == 'my-access-key'
assert s3_mover.attrs['use_ssl'] is True
Expand Down
46 changes: 42 additions & 4 deletions trollmoves/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,55 @@ def test_empty_init_arguments_does_not_crash_add(self):
Deleter(dict()).add('bla')


config_file = b"""
CONFIG_INI = b"""
[eumetcast-hrit-0deg]
origin = /local_disk/tellicast/received/MSGHRIT/H-000-{nominal_time:%Y%m%d%H%M}-{compressed:_<2s}
request_port = 9094
publisher_port = 9010
info = sensor=seviri;variant=0DEG
topic = /1b/hrit-segment/0deg
delete = False
# Everything below this should end up in connection_parameters dict
connection_uptime = 30
ssh_key_filename = id_rsa.pub
ssh_private_key_file = id_rsa
ssh_connection_timeout = 30
connection_parameters__secret = secret
connection_parameters__client_kwargs__endpoint_url = https://endpoint.url
connection_parameters__client_kwargs__verify = false
"""


def test_read_config_ini_with_dicts():
"""Test reading a config in ini format when dictionary values should be created."""
from trollmoves.server import read_config

with NamedTemporaryFile(suffix=".ini") as config_file:
config_file.write(CONFIG_INI)
config_file.flush()
with pytest.warns(UserWarning, match="Consider using connection_parameters__"):
config = read_config(config_file.name)
eumetcast = config["eumetcast-hrit-0deg"]
assert "origin" in eumetcast
assert "request_port" in eumetcast
assert "publisher_port" in eumetcast
assert "info" in eumetcast
assert "topic" in eumetcast
assert "delete" in eumetcast
expected_conn_params = {
"secret": "secret",
"client_kwargs": {
"endpoint_url": "https://endpoint.url",
"verify": False,
},
"connection_uptime": "30",
"ssh_key_filename": "id_rsa.pub",
"ssh_private_key_file": "id_rsa",
"ssh_connection_timeout": "30",
}
assert eumetcast["connection_parameters"] == expected_conn_params


class TestMoveItServer:
"""Test the move it server."""

Expand All @@ -207,7 +245,7 @@ def test_reloads_config_crashes_when_config_file_does_not_exist(self):
def test_reloads_config_on_example_config(self, fake_publisher):
"""Test that config can be reloaded with basic example."""
with NamedTemporaryFile() as temporary_config_file:
temporary_config_file.write(config_file)
temporary_config_file.write(CONFIG_INI)
config_filename = temporary_config_file.name
cmd_args = parse_args(["--port", "9999", config_filename])
server = MoveItServer(cmd_args)
Expand All @@ -218,7 +256,7 @@ def test_reloads_config_on_example_config(self, fake_publisher):
def test_reloads_config_calls_reload_config(self, mock_reload_config, mock_publisher):
"""Test that config file can be reloaded."""
with NamedTemporaryFile() as temporary_config_file:
temporary_config_file.write(config_file)
temporary_config_file.write(CONFIG_INI)
config_filename = temporary_config_file.name
cmd_args = parse_args(["--port", "9999", config_filename])
server = MoveItServer(cmd_args)
Expand All @@ -230,7 +268,7 @@ def test_reloads_config_calls_reload_config(self, mock_reload_config, mock_publi
def test_signal_reloads_config_calls_reload_config(self, mock_reload_config, mock_publisher):
"""Test that config file can be reloaded through signal."""
with NamedTemporaryFile() as temporary_config_file:
temporary_config_file.write(config_file)
temporary_config_file.write(CONFIG_INI)
config_filename = temporary_config_file.name
cmd_args = parse_args([config_filename])
client = MoveItServer(cmd_args)
Expand Down

0 comments on commit 22e2dc8

Please sign in to comment.