diff --git a/README.md b/README.md index ff6ae44470..35d26c8938 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ Due to unintuitive behavior with `sys.path` as expected. One can run the script like this instead: ```bash -PYTHONPATH=./src python -m subscription_manager.scripts.subscription_manager +PYTHONPATH=./src:./syspurpose/src python -m subscription_manager.scripts.subscription_manager ``` Similar for other bin scripts: diff --git a/src/subscription_manager/managercli.py b/src/subscription_manager/managercli.py index 15fbf681f5..ba736dfd48 100644 --- a/src/subscription_manager/managercli.py +++ b/src/subscription_manager/managercli.py @@ -169,6 +169,10 @@ _("System Type:") ] +SP_CONFLICT_MESSAGE = _("Due to a conflicting change made at the server the " + "{attr} has not been set.\n{advice}") +SP_ADVICE = _("If you'd like to overwrite the server side change please run: {command}") + def handle_exception(msg, ex): @@ -519,6 +523,168 @@ def main(self, args=None): raise ge +class SyspurposeCommand(CliCommand): + """ + Abstract command for manipulating an attribute of system purpose. + """ + + def __init__(self, name, shortdesc=None, primary=False, attr=None, commands=('set', 'unset')): + super(SyspurposeCommand, self).__init__(name, shortdesc=shortdesc, primary=primary) + self.commands = commands + self.attr = attr + + if 'set' in commands: + self.parser.add_option( + "--set", + dest="set", + help=(_("Set {attr} of system purpose".format(attr=attr))) + ) + if 'unset' in commands: + self.parser.add_option( + "--unset", + dest="unset", + action="store_true", + help=(_("Unset {attr} of system purpose".format(attr=attr))) + ) + if 'add' in commands: + self.parser.add_option( + "--add", + dest="to_add", + action="append", + default=[], + help=_("Add an item to the list ({attr}).".format(attr=attr)) + ) + if 'remove' in commands: + self.parser.add_option( + "--remove", + dest="to_remove", + action="append", + default=[], + help=_("Remove an item from the list ({attr}).".format(attr=attr)) + ) + + def _validate_options(self): + set = getattr(self.options, 'set', None) + unset = getattr(self.options, 'unset', None) + to_add = getattr(self.options, 'to_add', None) + to_remove = getattr(self.options, 'to_remove', None) + + if set: + self.options.set = self.options.set.strip() + if to_add: + self.options.to_add = [x.strip() for x in self.options.to_add if isinstance(x, str)] + if to_remove: + self.options.to_remove = [x.strip() for x in self.options.to_remove + if isinstance(x, str)] + + if (set or to_add or to_remove) and unset: + system_exit(os.EX_USAGE, _("--unset cannot be used with --set, --add, or --remove")) + if to_add and to_remove: + system_exit(os.EX_USAGE, _("--add cannot be used with --remove")) + + def set(self): + self._set(self.options.set) + success_msg = "{attr} set to \"{val}\".".format(attr=self.attr, val=self.options.set) + self._check_result( + expectation=lambda res: res.get(self.attr) == self.options.set, + success_msg=success_msg, + command="subscription-manager {name} --set {val}".format(name=self.name, + val=self.options.set), + attr=self.attr + ) + + def _set(self, to_set): + raise NotImplementedError("To be implemented in subclasses") + + def unset(self): + self._unset() + success_msg = _("{attr} unset.").format(attr=self.attr) + self._check_result( + expectation=lambda res: res.get(self.attr) in ["", None, []], + success_msg=success_msg, + command="subscription-manager {name} --unset".format(name=self.name), + attr=self.attr + ) + + def _unset(self): + self._set("") + + def add(self): + self._add(self.options.to_add) + success_msg = _("{attr} updated.").format(attr=self.name) + to_add = "--add " + "--add ".join(self.options.to_add) + command = "subscription-manager {name} ".format(name=self.name) + to_add + self._check_result( + expectation=lambda res: all(x in res.get('addons', []) for x in self.options.to_add), + success_msg=success_msg, + command=command, + attr=self.attr + ) + + def _add(self, to_add): + raise NotImplementedError("To be implemented in subclasses") + + def remove(self): + self._remove(self.options.to_remove) + success_msg = _("{attr} updated.").format(attr=self.name.capitalize()) + to_remove = "--remove " + "--remove ".join(self.options.to_remove) + command = "subscription-manager {name} ".format(name=self.name) + to_remove + self._check_result( + expectation=lambda res: all(x not in res.get('addons', []) for x in self.options.to_remove), + success_msg=success_msg, + command=command, + attr=self.attr + ) + + def _remove(self, to_remove): + raise NotImplementedError("To be implemented in subclasses") + + def show(self): + if self.is_registered(): + syspurpose = self.sync() + else: + syspurpose = syspurposelib.read_syspurpose() + if syspurpose is not None and self.attr in syspurpose and syspurpose[self.attr]: + val = syspurpose[self.attr] + values = val if not isinstance(val, list) else ", ".join(val) + print(_("Current {name}: {val}".format(name=self.name.capitalize(), + val=values))) + else: + print(_("{name} not set.".format(name=self.name.capitalize()))) + + def sync(self): + return syspurposelib.SyspurposeSyncActionCommand().perform(include_result=True)[1] + + def _do_command(self): + self._validate_options() + + if getattr(self.options, 'unset', None): + self.unset() + elif getattr(self.options, 'set', None): + self.set() + elif hasattr(self.options, 'to_add') and len(self.options.to_add) > 0: + self.add() + elif hasattr(self.options, 'to_remove') and len(self.options.to_remove) > 0: + self.remove() + else: + self.show() + + def check_syspurpose_support(self, attr): + if self.is_registered() and not self.cp.has_capability('syspurpose'): + print(_("Note: The currently configured entitlement server does not support System Purpose {attr}.".format(attr=attr))) + + def _check_result(self, expectation, success_msg, command, attr): + result = None + if self.is_registered(): + result = syspurposelib.SyspurposeSyncActionCommand().perform(include_result=True)[1] + + if result and not expectation(result): + advice = SP_ADVICE.format(command=command) + system_exit(os.EX_SOFTWARE, msgs=_(SP_CONFLICT_MESSAGE.format(attr=attr, advice=advice))) + else: + print(_(success_msg)) + + class UserPassCommand(CliCommand): """ @@ -843,7 +1009,7 @@ def _do_command(self): self._toggle(self.options.enable or False) -class ServiceLevelCommand(OrgCommand): +class ServiceLevelCommand(SyspurposeCommand, OrgCommand): def __init__(self): @@ -853,35 +1019,31 @@ def __init__(self): False) self._add_url_options() - self.parser.add_option("--show", dest="show", action='store_true', - help=_("show this system's current service level")) - self.parser.add_option("--list", dest="list", action='store_true', - help=_("list all service levels available")) - self.parser.add_option("--set", dest="service_level", - help=_("service level to apply to this system")) - self.parser.add_option("--unset", dest="unset", - action='store_true', - help=_("unset the service level for this system")) + self.parser.add_option( + "--show", + dest="show", + action='store_true', + help=_("show this system's current service level") + ) + self.parser.add_option( + "--list", + dest="list", + action='store_true', + help=_("list all service levels available") + ) self.identity = inj.require(inj.IDENTITY) - def _set_service_level(self, service_level): - consumer = self.cp.getConsumer(self.identity.uuid) - if 'serviceLevel' not in consumer: - system_exit(os.EX_UNAVAILABLE, _("Error: The service-level command is not supported by the server.")) - self.cp.updateConsumer(self.identity.uuid, service_level=service_level) - save_sla_to_syspurpose_metadata(service_level) - def _validate_options(self): - if self.options.service_level: - self.options.service_level = self.options.service_level.strip() + if self.options.set: + self.options.set = self.options.set.strip() # Assume --show if run with no args: if not self.options.list and \ not self.options.show and \ - not self.options.service_level and \ - not self.options.service_level == "" and \ + not self.options.set and \ + not self.options.set == "" and \ not self.options.unset: self.options.show = True @@ -907,36 +1069,26 @@ def _do_command(self): else: # get an UEP as consumer self.cp = self.cp_provider.get_consumer_auth_cp() - - if self.options.unset: - self.unset_service_level() - - if self.options.service_level is not None: - self.set_service_level(self.options.service_level) - - if self.options.show: - self.show_service_level() - - if self.options.list: - self.list_service_levels() - except connection.RestlibException as re: log.exception(re) log.error(u"Error: Unable to retrieve service levels: %s" % re) system_exit(os.EX_SOFTWARE, re.msg) except Exception as e: handle_exception(_("Error: Unable to retrieve service levels."), e) - - def set_service_level(self, service_level): - if service_level == "": - self.unset_service_level() else: - self._set_service_level(service_level) - print(_("Service level set to: %s") % service_level) + if self.options.unset: + self.unset() + elif self.options.set is not None: + self.set() + elif self.options.list: + self.list_service_levels() + elif self.options.show: + self.show_service_level() + else: + self.show_service_level() - def unset_service_level(self): - self._set_service_level("") - print(_("Service level preference has been unset")) + def _set(self, service_level): + save_sla_to_syspurpose_metadata(service_level) def show_service_level(self): consumer = self.cp.getConsumer(self.identity.uuid) @@ -975,78 +1127,20 @@ def list_service_levels(self): raise e -class UsageCommand(UserPassCommand): +class UsageCommand(SyspurposeCommand): def __init__(self): shortdesc = _("Manage usage setting for this system") self._org_help_text = _("use set and unset to define the value for this field") - super(UsageCommand, self).__init__("usage", shortdesc, - False) - - self.parser.add_option("--set", dest="usage", - help=_("usage setting to apply to this system")) - self.parser.add_option("--unset", dest="unset", - action='store_true', - help=_("unset the usage setting for this system")) + super(UsageCommand, self).__init__("usage", shortdesc, False, attr='usage', + commands=('set', 'unset')) - self.identity = inj.require(inj.IDENTITY) - - def _set_usage(self, usage): - consumer = self.cp.getConsumer(self.identity.uuid) - if 'usage' not in consumer: - system_exit(os.EX_UNAVAILABLE, _("Error: The usage command is not supported by the server.")) - self.cp.updateConsumer(self.identity.uuid, usage=usage) + def _set(self, usage): save_usage_to_syspurpose_metadata(usage) - def _validate_options(self): - - if self.options.usage: - self.options.usage = self.options.usage.strip() - - if self.options.usage and self.options.unset: - system_exit(os.EX_USAGE, _("Error: --set cannot be used with --unset")) - - def _do_command(self): - self._validate_options() - - # If we have a username/password, we're going to use that, otherwise - # we'll use the identity certificate. We already know one or the other - # exists: - if self.options.username and self.options.password: - self.cp_provider.set_user_pass(self.username, self.password) - self.cp = self.cp_provider.get_basic_auth_cp() - else: - # get an UEP as consumer - self.cp = self.cp_provider.get_consumer_auth_cp() - - if self.options.unset: - self.unset_usage() - elif self.options.usage is not None: - self.set_usage(self.options.usage) - else: - self.show_usage() - - def set_usage(self, usage): - if usage == "": - self.unset_usage() - else: - self._set_usage(usage) - print(_("Usage set to: %s") % usage) - - def unset_usage(self): - self._set_usage("") - print(_("Usage setting has been unset")) - - def show_usage(self): - consumer = self.cp.getConsumer(self.identity.uuid) - if 'usage' not in consumer: - system_exit(os.EX_UNAVAILABLE, _("Error: The usage command is not supported by the server.")) - usage = consumer['usage'] or "" - if usage: - print(_("Current usage setting: %s") % usage) - else: - print(_("Usage setting not set")) + def _unset(self): + self._set("") class RegisterCommand(UserPassCommand): @@ -1398,48 +1492,24 @@ def _do_command(self): print(_("System has been unregistered.")) -class AddonsCommand(CliCommand): +class AddonsCommand(SyspurposeCommand): def __init__(self): shortdesc = _("Modify or view the addons attribute of the system purpose") - super(AddonsCommand, self).__init__("addons", shortdesc=shortdesc, primary=False) - - self.parser.add_option("--add", dest="to_add", action="append", default=[], - help=_("Add an addon to the list.")) - self.parser.add_option("--remove", dest="to_remove", action="append", default=[], - help=_("Remove an addon from the list.")) - self.parser.add_option("--unset", dest="unset", action="store_true", - help=_("Remove all addons from the list"), default=False) + super(AddonsCommand, self).__init__("addons", shortdesc=shortdesc, primary=False, + attr='addons', commands=['unset', 'add', + 'remove']) - def _validate_options(self): - if self.options.unset and (self.options.to_add or self.options.to_remove): - system_exit(os.EX_USAGE, _("Error: --unset cannot be used with either --add or --remove")) - if self.options.to_add and self.options.to_remove: - system_exit(os.EX_USAGE, _("Error: --add cannot be used with --remove")) - - def _do_command(self): - self._validate_options() + def _unset(self): + syspurposelib.set("addons", []) + syspurposelib.write() - if not self.options.unset and not self.options.to_add and not self.options.to_remove: - addons = syspurposelib.read_syspurpose().get("addons") - if addons: - if isinstance(addons, list): - print(_("Current addons: %s" % ", ".join(addons))) - else: - print(_("Current addons: %s" % addons)) - else: - print(_("This system does not have any system purpose addons specified.")) - return + def _remove(self, val): + syspurposelib.remove_all("addons", val) + syspurposelib.write() - if self.options.unset: - syspurposelib.unset("addons") - print(_("Addons unset.")) - elif self.options.to_add: - syspurposelib.add_all("addons", self.options.to_add) - print(_("Addons added: %s" % ", ".join(self.options.to_add))) - elif self.options.to_remove: - syspurposelib.remove_all("addons", self.options.to_remove) - print(_("Addons removed: %s" % ", ".join(self.options.to_remove))) + def _add(self, val): + syspurposelib.add_all("addons", val) syspurposelib.write() @@ -2708,45 +2778,19 @@ def _list(self, all_overrides, specific_repos): print(columnize(names, echo_columnize_callback, *values, indent=2) + "\n") -class RoleCommand(CliCommand): +class RoleCommand(SyspurposeCommand): def __init__(self): shortdesc = _("Modify system purpose role") - super(RoleCommand, self).__init__("role", shortdesc, primary=False) - self.parser.add_option("--set", dest="set_role", - help=(_("Set role of system purpose"))) - self.parser.add_option("--unset", dest="unset_role", action="store_true", - help=(_("Unset role of system purpose"))) + super(RoleCommand, self).__init__("role", shortdesc, primary=False, attr='role', + commands=['set', 'unset']) def _validate_options(self): - if self.options.set_role and self.options.unset_role: + self.check_syspurpose_support('role') + if self.options.set and self.options.unset: system_exit(os.EX_USAGE, _("Error: Options --set and --unset of role subcommand are mutually exclusive.")) - def _set_role(self, role): - ret = save_role_to_syspurpose_metadata(role) - if ret: - print(_("System Purpose role has been set to: %s") % role) - else: - print(_("Error: System Purpose role has NOT been set to: %s") % role) - - def _unset_role(self): - ret = save_role_to_syspurpose_metadata(None) - if ret: - print(_("System Purpose role has been unset")) - else: - print(_("Error: System Purpose role has NOT been unset")) - - def _do_command(self): - self._validate_options() - if self.options.set_role: - self._set_role(self.options.set_role) - elif self.options.unset_role: - self._unset_role() - else: - sys_purpose_contents = syspurposelib.read_syspurpose() - if 'role' in sys_purpose_contents and sys_purpose_contents['role'] is not None: - print(_("System purpose role: %s") % sys_purpose_contents['role']) - else: - print(_("This system does not have any system purpose role")) + def _set(self, val): + save_role_to_syspurpose_metadata(val) class VersionCommand(CliCommand): diff --git a/src/subscription_manager/managerlib.py b/src/subscription_manager/managerlib.py index 006980cffd..4e864e9379 100644 --- a/src/subscription_manager/managerlib.py +++ b/src/subscription_manager/managerlib.py @@ -856,6 +856,7 @@ def clean_all_data(backup=True): # for deleting persistent caches cache.ProfileManager.delete_cache() cache.InstalledProductsManager.delete_cache() + cache.SyspurposeCache.delete_cache() # FIXME: implement as dbus client to facts service DeleteCache() once implemented #Facts.delete_cache() diff --git a/src/subscription_manager/syspurposelib.py b/src/subscription_manager/syspurposelib.py index 577d0fe6f0..acd52d862a 100644 --- a/src/subscription_manager/syspurposelib.py +++ b/src/subscription_manager/syspurposelib.py @@ -92,7 +92,7 @@ def save_role_to_syspurpose_metadata(role): :return: None """ - if SyspurposeStore: + if 'SyspurposeStore' in globals() and SyspurposeStore is not None: store = SyspurposeStore.read(USER_SYSPURPOSE) if role is None or role == "": @@ -299,7 +299,7 @@ def record_change(self, change): key=change.key, value=change.new_value, source=source ) elif change.in_base and change.previous_value != change.new_value: - msg = "'{key}' updated from '{old_value}' to '{old_value}' due to change in {source}"\ + msg = "'{key}' updated from '{old_value}' to '{new_value}' due to change in {source}"\ .format(key=change.key, new_value=change.new_value, old_value=change.previous_value, source=source) @@ -320,19 +320,23 @@ def __init__(self): self.cp_provider = inj.require(inj.CP_PROVIDER) self.uep = self.cp_provider.get_consumer_auth_cp() - def perform(self): + def perform(self, include_result=False): """ Perform the action that this Command represents. :return: """ + result = {} try: - self.sync() + result = self.sync() except ConnectionException as e: self.report._exceptions.append('Unable to sync syspurpose with server: %s' % str(e)) self.report._status = 'Failed to sync system purpose' self.report._updates = "\n\t\t ".join(self.report._updates) log.debug("Syspurpose updated: %s" % self.report) - return self.report + if not include_result: + return self.report + else: + return self.report, result def sync(self): """ @@ -342,7 +346,7 @@ def sync(self): """ if not self.uep.has_capability('syspurpose'): log.debug('Server does not support syspurpose, not syncing') - return + return read_syspurpose() consumer_identity = inj.require(inj.IDENTITY) consumer = self.uep.getConsumer(consumer_identity.uuid) @@ -351,7 +355,10 @@ def sync(self): sp_cache = SyspurposeCache() # Translate from the remote values to the local, filtering out items not known for attr in ATTRIBUTES: - server_sp[attr] = consumer.get(LOCAL_TO_REMOTE[attr]) + value = consumer.get(LOCAL_TO_REMOTE[attr]) + if value is None: + value = "" + server_sp[attr] = value try: filesystem_sp = read_syspurpose(raise_on_error=True) @@ -371,11 +378,14 @@ def sync(self): sp_cache.write_cache() write_syspurpose(result) - - self.uep.updateConsumer(consumer_identity.uuid, role=result[ROLE], - addons=result[ADDONS], - service_level=result[SERVICE_LEVEL], - usage=result[USAGE]) + addons = result.get(ADDONS) + self.uep.updateConsumer( + consumer_identity.uuid, + role=result.get(ROLE) or "", + addons=addons if addons is not None else "", + service_level=result.get(SERVICE_LEVEL) or "", + usage=result.get(USAGE) or "" + ) self.report._status = 'Successfully synced system purpose' diff --git a/test/gui/test_registrationgui.py b/test/gui/test_registrationgui.py index ee4e32e2e6..53f65736fc 100644 --- a/test/gui/test_registrationgui.py +++ b/test/gui/test_registrationgui.py @@ -2,7 +2,7 @@ from mock import Mock, patch -from test.fixture import SubManFixture +from test.fixture import SubManFixture, set_up_mock_sp_store from test.stubs import StubBackend, StubFacts from subscription_manager.gui.registergui import RegisterWidget, RegisterInfo, \ @@ -235,6 +235,10 @@ def setUp(self): super(AsyncBackendTests, self).setUp() self.backend = StubBackend() self.asyncBackend = AsyncBackend(self.backend) + syspurpose_patch = patch('subscription_manager.syspurposelib.SyspurposeStore') + self.mock_sp_store = syspurpose_patch.start() + self.mock_sp_store, self.mock_sp_store_contents = set_up_mock_sp_store(self.mock_sp_store) + self.addCleanup(syspurpose_patch.stop) def test_auto_system_complete(self): self.backend.cp_provider.get_consumer_auth_cp().getConsumer = \ diff --git a/test/stubs.py b/test/stubs.py index 29cac73361..5e140584b4 100644 --- a/test/stubs.py +++ b/test/stubs.py @@ -462,7 +462,7 @@ def getServiceLevelList(self, owner): def updateConsumer(self, consumer, facts=None, installed_products=None, guest_uuids=None, service_level=None, release=None, autoheal=None, - content_tags=None, usage=None): + content_tags=None, addons=None, role=None, usage=None): return consumer def setEnvironmentList(self, env_list): diff --git a/test/test_managercli.py b/test/test_managercli.py index 31af80c331..178d5ee531 100644 --- a/test/test_managercli.py +++ b/test/test_managercli.py @@ -860,6 +860,10 @@ def setUp(self): argv_patcher.start() self.addCleanup(argv_patcher.stop) self.cc.cp = Mock() + syspurpose_patch = patch('subscription_manager.syspurposelib.SyspurposeStore') + self.mock_sp_store = syspurpose_patch.start() + self.mock_sp_store, self.mock_sp_store_contents = set_up_mock_sp_store(self.mock_sp_store) + self.addCleanup(syspurpose_patch.stop) def check_output_for_repos(self, output, repos): """ @@ -1454,18 +1458,9 @@ def test_org_requires_list_error(self): def test_org_requires_list_good(self): self.cc.main(["--org", "one", "--list"]) - def test_service_level_not_supported(self): - self.cc.cp.setConsumer({}) - try: - self.cc.set_service_level('JARJAR') - except SystemExit as e: - self.assertEqual(e.code, os.EX_UNAVAILABLE) - else: - self.fail("No Exception Raised") - def test_service_level_supported(self): self.cc.cp.setConsumer({'serviceLevel': 'Jarjar'}) - self.cc.set_service_level('JRJAR') + self.cc._set('JRJAR') def test_service_level_creates_syspurpose_dir_and_file(self): # create a mock /etc/rhsm/ directory, and set the value of a mock USER_SYSPURPOSE under that @@ -1475,7 +1470,7 @@ def test_service_level_creates_syspurpose_dir_and_file(self): syspurposelib.USER_SYSPURPOSE = mock_syspurpose_file self.cc.cp.setConsumer({'serviceLevel': 'Jarjar'}) - self.cc.set_service_level('JRJAR') + self.cc._set('JRJAR') self.mock_sp_store.set.assert_has_calls([call("service_level_agreement", "JRJAR")]) self.mock_sp_store.write.assert_called_once() @@ -1484,57 +1479,6 @@ def test_service_level_creates_syspurpose_dir_and_file(self): self.assertEqual(contents.get("service_level_agreement"), "JRJAR") -class TestUsageCommand(TestCliProxyCommand): - command_class = managercli.UsageCommand - - def setUp(self): - TestCliProxyCommand.setUp(self) - self.cc.consumerIdentity = StubConsumerIdentity - self.cc.cp = StubUEP() - - from subscription_manager import syspurposelib - - self.syspurposelib = syspurposelib - self.syspurposelib.USER_SYSPURPOSE = self.write_tempfile("{}").name - - syspurpose_patch = patch('subscription_manager.syspurposelib.SyspurposeStore') - self.mock_sp_store = syspurpose_patch.start() - self.mock_sp_store, self.mock_sp_store_contents = set_up_mock_sp_store(self.mock_sp_store) - self.addCleanup(syspurpose_patch.stop) - - def tearDown(self): - super(TestUsageCommand, self).tearDown() - syspurposelib.USER_SYSPURPOSE = "/etc/rhsm/syspurpose/syspurpose.json" - - def test_usage_not_supported(self): - self.cc.cp.setConsumer({}) - with self.assertRaisesRegexp(SystemExit, r'' + str(os.EX_UNAVAILABLE)): - self.cc.set_usage('JARJAR') - - def test_usage_supported(self): - self.cc.cp.setConsumer({'usage': 'Jarjar'}) - self.cc.set_usage('JRJAR') - - def test_usage_creates_syspurpose_dir_and_file(self): - # create a mock /etc/rhsm/ directory, and set the value of a mock USER_SYSPURPOSE under that - mock_etc_rhsm_dir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, mock_etc_rhsm_dir) - mock_syspurpose_file = os.path.join(mock_etc_rhsm_dir, "syspurpose/syspurpose.json") - syspurposelib.USER_SYSPURPOSE = mock_syspurpose_file - - # make sure the subdirectory 'mock_etc_rhsm_dir/syspurpose' does not exist yet: - self.mock_sp_store.get.assert_not_called() - - self.cc.cp.setConsumer({'usage': 'Jarjar'}) - self.cc.set_usage('JRJAR') - self.mock_sp_store.set.assert_has_calls([call("usage", "JRJAR")]) - self.mock_sp_store.write.assert_called_once() - - # make sure the sla has been persisted in syspurpose.json: - contents = syspurposelib.read_syspurpose() - self.assertEqual(contents.get("usage"), "JRJAR") - - class TestReleaseCommand(TestCliProxyCommand): command_class = managercli.ReleaseCommand @@ -1581,78 +1525,120 @@ class TestRoleCommand(TestCliCommand): def setUp(self): super(TestRoleCommand, self).setUp(False) self.cc = self.command_class() + self.cc.cp = StubUEP() + self.cc.cp.registered_consumer_info['role'] = None + self.cc.cp._capabilities = ["syspurpose"] def test_wrong_options_syspurpose_role(self): """It is possible to use --set or --unset options. It's not possible to use both of them together.""" + self.cc.options = Mock() + self.cc.options.set = "Foo" + self.cc.options.unset = True + self.cc.options.to_add = False try: - self.cc.main(["--set", "Foo", "--unset"]) self.cc._validate_options() except SystemExit as e: self.assertEqual(e.code, os.EX_USAGE) @patch("subscription_manager.syspurposelib.SyspurposeStore") - def test_main_no_args(self, mock_syspurpose): + @patch("subscription_manager.syspurposelib.SyspurposeCache") + def test_main_no_args(self, mock_syspurpose_cache, mock_syspurpose): """It is necessary to mock SyspurposeStore for test function of parent class""" mock_syspurpose.read = Mock() mock_syspurpose.read.return_value = Mock() instance_syspurpose_store = mock_syspurpose.read.return_value instance_syspurpose_store.contents = {'role': 'Foo'} - super(TestRoleCommand, self).test_main_no_args() + + mock_syspurpose_cache.return_value = Mock() + instance_mock_syspurpose_cache = mock_syspurpose_cache.return_value + instance_mock_syspurpose_cache.write_cache = Mock() + instance_mock_syspurpose_cache.read_cache_only = Mock(return_value={}) + + with patch.object(managercli.SyspurposeCommand, 'check_syspurpose_support', Mock(return_value=None)): + super(TestRoleCommand, self).test_main_no_args() @patch("subscription_manager.syspurposelib.SyspurposeStore") - def test_main_empty_args(self, mock_syspurpose): + @patch("subscription_manager.syspurposelib.SyspurposeCache") + def test_main_empty_args(self, mock_syspurpose_cache, mock_syspurpose): """It is necessary to mock SyspurposeStore for test function of parent class""" mock_syspurpose.read = Mock() mock_syspurpose.read.return_value = Mock() instance_syspurpose_store = mock_syspurpose.read.return_value instance_syspurpose_store.contents = {'role': 'Foo'} - super(TestRoleCommand, self).test_main_empty_args() + + mock_syspurpose_cache.return_value = Mock() + instance_mock_syspurpose_cache = mock_syspurpose_cache.return_value + instance_mock_syspurpose_cache.write_cache = Mock() + instance_mock_syspurpose_cache.read_cache_only = Mock(return_value={}) + + with patch.object(managercli.SyspurposeCommand, 'check_syspurpose_support', Mock(return_value=None)): + super(TestRoleCommand, self).test_main_empty_args() @patch("subscription_manager.syspurposelib.SyspurposeStore") - def test_display_valid_syspurpose_role(self, mock_syspurpose): + @patch("subscription_manager.syspurposelib.SyspurposeSyncActionCommand") + def test_display_valid_syspurpose_role(self, mock_syspurpose_sync, mock_syspurpose): mock_syspurpose.read = Mock() mock_syspurpose.read.return_value = Mock() instance_syspurpose_store = mock_syspurpose.read.return_value instance_syspurpose_store.contents = {'role': 'Foo'} - self.cc.options = Mock() - self.cc.options.set_role = None - self.cc.options.unset_role = False + self.cc.options = Mock(spec=['set', 'unset']) + self.cc.options.set = None + self.cc.options.unset = False + + mock_syspurpose_sync.return_value = Mock() + instance_mock_syspurpose_sync = mock_syspurpose_sync.return_value + instance_mock_syspurpose_sync.perform = Mock(return_value=({}, {"role": "Foo"})) with Capture() as cap: self.cc._do_command() - self.assertIn("System purpose role: Foo", cap.out) + self.assertIn("Current Role: Foo", cap.out) @patch("subscription_manager.syspurposelib.SyspurposeStore") - def test_display_none_syspurpose_role(self, mock_syspurpose): + @patch("subscription_manager.syspurposelib.SyspurposeCache") + def test_display_none_syspurpose_role(self, mock_syspurpose_cache, mock_syspurpose): mock_syspurpose.read = Mock() mock_syspurpose.read.return_value = Mock() instance_syspurpose_store = mock_syspurpose.read.return_value instance_syspurpose_store.contents = {'role': None} - self.cc.options = Mock() - self.cc.options.set_role = None - self.cc.options.unset_role = False + mock_syspurpose_cache.return_value = Mock() + instance_mock_syspurpose_cache = mock_syspurpose_cache.return_value + instance_mock_syspurpose_cache.write_cache = Mock() + instance_mock_syspurpose_cache.read_cache_only = Mock(return_value={}) + + self.cc.options = Mock(spec=['set', 'unset']) + self.cc.options.set = None + self.cc.options.unset = False with Capture() as cap: self.cc._do_command() - self.assertIn("This system does not have any system purpose role", cap.out) + self.assertIn("Role not set", cap.out) @patch("subscription_manager.syspurposelib.SyspurposeStore") - def test_display_nonexisting_syspurpose_role(self, mock_syspurpose): + @patch("subscription_manager.syspurposelib.SyspurposeCache") + def test_display_nonexisting_syspurpose_role(self, mock_syspurpose_cache, mock_syspurpose): mock_syspurpose.read = Mock() mock_syspurpose.read.return_value = Mock() instance_syspurpose_store = mock_syspurpose.read.return_value instance_syspurpose_store.contents = {} - self.cc.options = Mock() - self.cc.options.set_role = None - self.cc.options.unset_role = False + self.cc.options = Mock(spec=['set', 'unset']) + self.cc.options.set = None + self.cc.options.unset = False + + mock_syspurpose_cache.return_value = Mock() + instance_mock_syspurpose_cache = mock_syspurpose_cache.return_value + instance_mock_syspurpose_cache.write_cache = Mock() + instance_mock_syspurpose_cache.read_cache_only = Mock(return_value={}) + with Capture() as cap: self.cc._do_command() - self.assertIn("This system does not have any system purpose role", cap.out) + self.assertIn("Role not set.", cap.out) @patch("subscription_manager.syspurposelib.SyspurposeStore") - def test_setting_syspurpose_role(self, mock_syspurpose): + @patch("subscription_manager.syspurposelib.SyspurposeSyncActionCommand") + @patch("subscription_manager.syspurposelib.SyspurposeCache") + def test_setting_syspurpose_role(self, mock_syspurpose_cache, mock_syspurpose_sync, mock_syspurpose): mock_syspurpose.read = Mock() mock_syspurpose.read.return_value = Mock() instance_syspurpose_store = mock_syspurpose.read.return_value @@ -1660,18 +1646,29 @@ def test_setting_syspurpose_role(self, mock_syspurpose): instance_syspurpose_store.set = MagicMock(return_value=True) instance_syspurpose_store.write = MagicMock(return_value=None) - self.cc.options = Mock() - self.cc.options.set_role = 'Foo' - self.cc.options.unset_role = False + mock_syspurpose_sync.return_value = Mock() + instance_mock_syspurpose_sync = mock_syspurpose_sync.return_value + instance_mock_syspurpose_sync.perform = Mock(return_value=({}, {"role": "Foo"})) + + mock_syspurpose_cache.return_value = Mock() + instance_mock_syspurpose_cache = mock_syspurpose_cache.return_value + instance_mock_syspurpose_cache.write_cache = Mock() + instance_mock_syspurpose_cache.read_cache_only = Mock(return_value={}) + + self.cc.options = Mock(spec=['set', 'unset']) + self.cc.options.set = 'Foo' + self.cc.options.unset = False with Capture() as cap: self.cc._do_command() - self.assertIn("System Purpose role has been set to: Foo", cap.out) + self.assertIn('role set to "Foo"', cap.out) instance_syspurpose_store.set.assert_called_once_with('role', 'Foo') instance_syspurpose_store.write.assert_called_once() @patch("subscription_manager.syspurposelib.SyspurposeStore") - def test_unsetting_syspurpose_role(self, mock_syspurpose): + @patch("subscription_manager.syspurposelib.SyspurposeSyncActionCommand") + @patch("subscription_manager.syspurposelib.SyspurposeCache") + def test_unsetting_syspurpose_role(self, mock_syspurpose_cache, mock_syspurpose_sync, mock_syspurpose): mock_syspurpose.read = Mock() mock_syspurpose.read.return_value = Mock() instance_syspurpose_store = mock_syspurpose.read.return_value @@ -1679,13 +1676,22 @@ def test_unsetting_syspurpose_role(self, mock_syspurpose): instance_syspurpose_store.unset = MagicMock(return_value=True) instance_syspurpose_store.write = MagicMock(return_value=None) - self.cc.options = Mock() - self.cc.options.set_role = None - self.cc.options.unset_role = True + mock_syspurpose_sync.return_value = Mock() + instance_mock_syspurpose_sync = mock_syspurpose_sync.return_value + instance_mock_syspurpose_sync.perform = Mock(return_value=({}, {"role": ""})) + + mock_syspurpose_cache.return_value = Mock() + instance_mock_syspurpose_cache = mock_syspurpose_cache.return_value + instance_mock_syspurpose_cache.write_cache = Mock() + instance_mock_syspurpose_cache.read_cache_only = Mock(return_value={}) + + self.cc.options = Mock(spec=['set', 'unset']) + self.cc.options.set = None + self.cc.options.unset = True with Capture() as cap: self.cc._do_command() - self.assertIn("System Purpose role has been unset", cap.out) + self.assertIn("role unset", cap.out) instance_syspurpose_store.unset.assert_called_once_with('role') instance_syspurpose_store.write.assert_called_once() diff --git a/test/test_syspurposelib.py b/test/test_syspurposelib.py index dade574503..c63afc088c 100644 --- a/test/test_syspurposelib.py +++ b/test/test_syspurposelib.py @@ -223,7 +223,7 @@ def test_sync_missing_capability(self, mock_read_sp, mock_cache, mock_merge, moc mock_merge.assert_not_called() mock_write.assert_not_called() - self.assertEqual(result, None) + self.assertEqual(result, self.local_sp) update.assert_not_called()