Skip to content

Commit

Permalink
[6.14.z] CSV formatting fixes (#15822)
Browse files Browse the repository at this point in the history
fixes (#15822)
  • Loading branch information
tpapaioa committed Aug 2, 2024
1 parent c0c16b6 commit 32e774d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 31 deletions.
42 changes: 19 additions & 23 deletions robottelo/cli/hammer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import json
import re

from robottelo.config import logger


def _normalize(header):
"""Replace empty spaces with '-' and lower all chars"""
Expand Down Expand Up @@ -35,31 +37,25 @@ def _normalize_obj(obj):
return obj


def is_csv(output):
"""Verifies if the output string is eligible for converting into CSV"""
sniffer = csv.Sniffer()
try:
sniffer.sniff(output)
return True
except csv.Error:
return False
def parse_csv(output):
"""Parse CSV output from Hammer CLI and return a Python dictionary."""

# https://projects.theforeman.org/issues/37264
NON_CSV_PATTERN = r'\d+ task\(s\), \d+ success, \d+ fail'

def parse_csv(output):
"""Parse CSV output from Hammer CLI and convert it to python dictionary."""
# ignore warning about puppet and ostree deprecation
output.replace('Puppet and OSTree will no longer be supported in Katello 3.16\n', '')
is_rex = 'Job invocation' in output
is_pkg_list = 'Nvra' in output
# Validate if the output is eligible for CSV conversions else return as it is
if not is_csv(output) and not is_rex and not is_pkg_list:
return output
output = output.splitlines()[0:2] if is_rex else output.splitlines()
reader = csv.reader(output)
# Generate the key names, spaces will be converted to dashes "-"
keys = [_normalize(header) for header in next(reader)]
# For each entry, create a dict mapping each key with each value
return [dict(zip(keys, values, strict=True)) for values in reader if len(values) > 0]
output, num_changes = re.subn(NON_CSV_PATTERN, '', output)
if num_changes > 0:
logger.warning(f'Removed output from CLI based on regex: {NON_CSV_PATTERN!s}')

output = output.splitlines()

# Normalize the column names to use when generating the dictionary
try:
keys = [_normalize(header) for header in next(csv.reader(output))]
return [value for value in csv.DictReader(output[1:], fieldnames=keys)]
except csv.Error as err:
logger.error(f'Exception while parsing CSV output {output}: {err}')
raise


def parse_help(output):
Expand Down
12 changes: 6 additions & 6 deletions tests/foreman/cli/test_ldapauthsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def test_positive_refresh_usergroup_with_ad(self, member_group, ad_data, module_
based on user-sync
"""
ad_data = ad_data()
LOGEDIN_MSG = "Using configured credentials for user '{0}'."
LOGGEDIN_MSG = "Using configured credentials for user '{0}'."
auth_source = module_target_sat.cli_factory.ldap_auth_source(
{
'name': gen_string('alpha'),
Expand Down Expand Up @@ -134,7 +134,7 @@ def test_positive_refresh_usergroup_with_ad(self, member_group, ad_data, module_
result = module_target_sat.cli.Auth.with_user(
username=ad_data['ldap_user_name'], password=ad_data['ldap_user_passwd']
).status()
assert LOGEDIN_MSG.format(ad_data['ldap_user_name']) in result[0]['message']
assert LOGGEDIN_MSG.format(ad_data['ldap_user_name']) in result[0]['message']
module_target_sat.cli.UserGroupExternal.refresh(
{'user-group-id': user_group['id'], 'name': member_group}
)
Expand Down Expand Up @@ -221,7 +221,7 @@ def test_usergroup_sync_with_refresh(self, default_ipa_host, module_target_sat):
ipa_group_base_dn = default_ipa_host.group_base_dn.replace('foobargroup', 'foreman_group')
member_username = 'foreman_test'
member_group = 'foreman_group'
LOGEDIN_MSG = "Using configured credentials for user '{0}'."
LOGGEDIN_MSG = "Using configured credentials for user '{0}'."
auth_source_name = gen_string('alpha')
auth_source = module_target_sat.cli_factory.ldap_auth_source(
{
Expand Down Expand Up @@ -262,7 +262,7 @@ def test_usergroup_sync_with_refresh(self, default_ipa_host, module_target_sat):
result = module_target_sat.cli.Auth.with_user(
username=member_username, password=default_ipa_host.ldap_user_passwd
).status()
assert LOGEDIN_MSG.format(member_username) in result[0]['message']
assert LOGGEDIN_MSG.format(member_username) in result[0]['message']
with pytest.raises(CLIReturnCodeError) as error:
module_target_sat.cli.Role.with_user(
username=member_username, password=default_ipa_host.ldap_user_passwd
Expand Down Expand Up @@ -308,7 +308,7 @@ def test_usergroup_with_usergroup_sync(self, default_ipa_host, module_target_sat
ipa_group_base_dn = default_ipa_host.group_base_dn.replace('foobargroup', 'foreman_group')
member_username = 'foreman_test'
member_group = 'foreman_group'
LOGEDIN_MSG = "Using configured credentials for user '{0}'."
LOGGEDIN_MSG = "Using configured credentials for user '{0}'."
auth_source_name = gen_string('alpha')
auth_source = module_target_sat.cli_factory.ldap_auth_source(
{
Expand Down Expand Up @@ -349,7 +349,7 @@ def test_usergroup_with_usergroup_sync(self, default_ipa_host, module_target_sat
result = module_target_sat.cli.Auth.with_user(
username=member_username, password=default_ipa_host.ldap_user_passwd
).status()
assert LOGEDIN_MSG.format(member_username) in result[0]['message']
assert LOGGEDIN_MSG.format(member_username) in result[0]['message']
list = module_target_sat.cli.Role.with_user(
username=member_username, password=default_ipa_host.ldap_user_passwd
).list()
Expand Down
4 changes: 2 additions & 2 deletions tests/foreman/destructive/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from robottelo.config import settings
from robottelo.constants import HAMMER_CONFIG

LOGEDIN_MSG = "Session exists, currently logged in as '{0}'"
LOGGEDIN_MSG = "Session exists, currently logged in as '{0}'"
password = gen_string('alpha')
pytestmark = pytest.mark.destructive

Expand All @@ -45,5 +45,5 @@ def test_positive_password_reset(target_sat):
{'username': settings.server.admin_username, 'password': reset_password}
)
result = target_sat.cli.Auth.with_user().status()
assert LOGEDIN_MSG.format(settings.server.admin_username) in result.split("\n")[1]
assert LOGGEDIN_MSG.format(settings.server.admin_username) in result[0]['message']
assert target_sat.cli.Org.with_user().list()

0 comments on commit 32e774d

Please sign in to comment.