diff --git a/test/test_origin_pattern.py b/test/test_origin_pattern.py index 85cb15b2..618a8f53 100755 --- a/test/test_origin_pattern.py +++ b/test/test_origin_pattern.py @@ -8,9 +8,9 @@ import unattended_upgrade from unattended_upgrade import ( check_changes_for_sanity, - is_in_allowed_origin, + ver_in_origins, get_distro_codename, - match_whitelist_string, + match_pattern_string, UnknownMatcherError, ) @@ -31,6 +31,10 @@ class MockPackage(): pass +class MockVersion(): + pass + + class MockCache(dict): def __iter__(self): for pkgname in self.keys(): @@ -43,6 +47,8 @@ def get_changes(self): blacklist = [] # type: List[str] whitelist = [] # type: List[str] strict_whitelist = False # type: bool + blacklisted_origins = [] # type: List[str] + strict_blacklist = True # type: bool class MockDepCache(): @@ -51,35 +57,35 @@ class MockDepCache(): class TestOriginPatern(TestBase): - def test_match_whitelist_string(self): + def test_match_pattern_string(self): origin = self._get_mock_origin( "OriginUbuntu", "LabelUbuntu", "ArchiveUbuntu", "archive.ubuntu.com", "main") # good s = "o=OriginUbuntu" - self.assertTrue(match_whitelist_string(s, origin)) + self.assertTrue(match_pattern_string(s, origin)) s = "o=OriginUbuntu,l=LabelUbuntu,a=ArchiveUbuntu," \ "site=archive.ubuntu.com" - self.assertTrue(match_whitelist_string(s, origin)) + self.assertTrue(match_pattern_string(s, origin)) # bad s = "" - self.assertFalse(match_whitelist_string(s, origin)) + self.assertFalse(match_pattern_string(s, origin)) s = "o=something" - self.assertFalse(match_whitelist_string(s, origin)) + self.assertFalse(match_pattern_string(s, origin)) s = "o=LabelUbuntu,a=no-match" - self.assertFalse(match_whitelist_string(s, origin)) + self.assertFalse(match_pattern_string(s, origin)) # with escaping origin = self._get_mock_origin("Google, Inc.", archive="stable") # good s = "o=Google\\, Inc.,a=stable" - self.assertTrue(match_whitelist_string(s, origin)) + self.assertTrue(match_pattern_string(s, origin)) def test_match_whitelist_from_conffile(self): # read some apt_pkg.config.clear("Unattended-Upgrade") apt_pkg.read_config_file( apt_pkg.config, "./data/50unattended-upgrades.Test") - allowed_origins = unattended_upgrade.get_allowed_origins() + allowed_origins = unattended_upgrade.get_origins_from_conf() # print allowed_origins self.assertTrue("o=aOrigin,a=aArchive" in allowed_origins) self.assertTrue("s=aSite,l=aLabel" in allowed_origins) @@ -89,25 +95,25 @@ def test_macro(self): codename = get_distro_codename() s = "a=${distro_codename}" origin = self._get_mock_origin("Foo", archive=codename) - self.assertTrue(match_whitelist_string(s, origin)) + self.assertTrue(match_pattern_string(s, origin)) def test_compatiblity(self): apt_pkg.config.clear("Unattended-Upgrade") apt_pkg.read_config_file( apt_pkg.config, "./data/50unattended-upgrades.compat") - allowed_origins = unattended_upgrade.get_allowed_origins() + allowed_origins = unattended_upgrade.get_origins_from_conf() # print allowed_origins self.assertTrue("o=Google\\, Inc.,a=stable" in allowed_origins) self.assertTrue("o=MoreCorp\\, eink,a=stable" in allowed_origins) # test whitelist pkg = self._get_mock_package() - self.assertTrue(is_in_allowed_origin(pkg.candidate, allowed_origins)) + self.assertTrue(ver_in_origins(pkg.candidate, allowed_origins)) def test_escaped_colon(self): apt_pkg.config.clear("Unattended-Upgrade") apt_pkg.read_config_file( apt_pkg.config, "./data/50unattended-upgrades.colon") - allowed_origins = unattended_upgrade.get_allowed_origins() + allowed_origins = unattended_upgrade.get_origins_from_conf() self.assertIn('o=http://foo.bar,a=stable', allowed_origins) @@ -115,7 +121,7 @@ def test_unkown_matcher(self): apt_pkg.config.clear("Unattended-Upgrade") s = "xxx=OriginUbuntu" with self.assertRaises(UnknownMatcherError): - self.assertTrue(match_whitelist_string(s, None)) + self.assertTrue(match_pattern_string(s, None)) def test_blacklist(self): # get the mocks @@ -179,6 +185,8 @@ def _get_mock_package(self, name="foo"): self._get_mock_origin(aorigin="Google, Inc.", archive="stable")] pkg.candidate.record = {} + pkg.versions = [MockVersion()] + pkg.versions[0].origins = pkg.candidate.origins return pkg def test_match_whitelist_wildcard(self): @@ -187,16 +195,16 @@ def test_match_whitelist_wildcard(self): "archive.ubuntu.com", "main") # good s = "o=OriginU*" - self.assertTrue(match_whitelist_string(s, origin)) + self.assertTrue(match_pattern_string(s, origin)) # bad s = "o=X*" - self.assertFalse(match_whitelist_string(s, origin)) + self.assertFalse(match_pattern_string(s, origin)) # good s = "o=?riginUbunt?" - self.assertTrue(match_whitelist_string(s, origin)) + self.assertTrue(match_pattern_string(s, origin)) # good s = "o=*Ubunt?" - self.assertTrue(match_whitelist_string(s, origin)) + self.assertTrue(match_pattern_string(s, origin)) def test_get_allowed_origins_legacy(self): for cfg, (distro_id, distro_codename) in ( diff --git a/test/test_substitute.py b/test/test_substitute.py index a9036983..5f2aa6ac 100755 --- a/test/test_substitute.py +++ b/test/test_substitute.py @@ -6,7 +6,7 @@ import apt_pkg -from unattended_upgrade import substitute, get_allowed_origins +from unattended_upgrade import substitute, get_origins_from_conf from test.test_base import TestBase @@ -24,11 +24,11 @@ def testSubstitute(self): self.assertTrue(substitute("${distro_id}"), "MyDistroID") def test_get_allowed_origins_with_substitute(self): - """ test if substitute for get_allowed_origins works """ + """ test if substitute for get_origins_from_conf works """ apt_pkg.config.clear("Unattended-Upgrade::Allowed-Origins") apt_pkg.config.set("Unattended-Upgrade::Allowed-Origins::", "${distro_id} ${distro_codename}-security") - li = get_allowed_origins() + li = get_origins_from_conf() self.assertIn("o=MyDistroID,a=mycodename-security", li) diff --git a/unattended-upgrade b/unattended-upgrade index d4956738..8e059996 100755 --- a/unattended-upgrade +++ b/unattended-upgrade @@ -84,6 +84,12 @@ from textwrap import wrap import apt import apt_inst import apt_pkg +from apt.package import Origin, Package, Version +Origin # pyflakes +Package # pyflakes +Version # pyflakes +from apt_pkg import PackageFile +PackageFile # pyflakes import distro_info @@ -133,6 +139,9 @@ logged_msgs = set() # type: AbstractSet[str] NEVER_PIN = -32768 +ORIGIN_BLACKLIST_KEY = "Unattended-Upgrade::Blacklist-Origins-Pattern" +ORIGIN_ALLOWED_KEY = "Unattended-Upgrade::Origins-Pattern" + class LoggingDateTime: """The date/time representation for the dpkg log file timestamps""" @@ -155,7 +164,7 @@ class UnknownMatcherError(ValueError): pass -class NoAllowedOriginError(ValueError): +class NoMatchedOriginError(ValueError): pass @@ -169,10 +178,18 @@ class UnattendedUpgradesCache(apt.Cache): # type: (str) -> None self._cached_candidate_pkgnames = set() # type: Set[str] - self.allowed_origins = get_allowed_origins() + self.allowed_origins = get_origins_from_conf(ORIGIN_ALLOWED_KEY) logging.info(_("Allowed origins are: %s"), ", ".join(self.allowed_origins)) + self.blacklisted_origins = get_origins_from_conf(ORIGIN_BLACKLIST_KEY) + logging.info(_("Blacklisted origins are: %s"), + ", ".join(self.blacklisted_origins)) + + self.strict_blacklist = apt_pkg.config.find_b( + "Unattended-Upgrade::Blacklist-Origins-Strict", True + ) + self.blacklist = apt_pkg.config.value_list( "Unattended-Upgrade::Package-Blacklist") logging.info(_("Initial blacklist: %s"), " ".join(self.blacklist)) @@ -210,7 +227,8 @@ class UnattendedUpgradesCache(apt.Cache): for v in pkg.versions: if pkg.installed < v \ and pkg.installed.policy_priority <= v.policy_priority \ - and is_in_allowed_origin(v, self.allowed_origins): + and (ver_in_origins(v, self.allowed_origins) + or ver_in_origins(v, self.blacklisted_origins)): return v return None @@ -226,26 +244,30 @@ class UnattendedUpgradesCache(apt.Cache): for pkg in self: better_version = self.find_better_version(pkg) if better_version: - logging.info(self.kept_package_excuse(pkg._pkg, + logging.info(self.kept_package_excuse(pkg, self.blacklist, self.whitelist, self.strict_whitelist, - better_version)) + better_version, + self.blacklisted_origins)) kept_packages.add(pkg, better_version, self) return kept_packages - def kept_package_excuse(self, pkg, # apt.Package - blacklist, # type: List[str] - whitelist, # type: List[str] - strict_whitelist, # type: bool - better_version # type: apt.package.Version + def kept_package_excuse(self, pkg, # type: Package + blacklist, # type: List[str] + whitelist, # type: List[str] + strict_whitelist, # type: bool + better_version, # type: apt.package.Version + blacklisted_origins # type: List[str] ): # type: (...) -> str """ Log the excuse the package is kept back for """ - if pkg.selected_state == apt_pkg.SELSTATE_HOLD: + if pkg._pkg.selected_state == apt_pkg.SELSTATE_HOLD: return _("Package %s is marked to be held back.") % pkg.name elif is_pkgname_in_blacklist(pkg.name, blacklist): return _("Package %s is blacklisted.") % pkg.name + elif pkg_in_origins(pkg, blacklisted_origins, self.strict_blacklist): + return _("Package %s is blacklisted (by origin)") % pkg.name elif whitelist: if strict_whitelist: if not is_pkgname_in_whitelist(pkg.name, whitelist): @@ -290,11 +312,14 @@ class UnattendedUpgradesCache(apt.Cache): pins = [] # type: List[Union[PkgPin, PkgFilePin]] - # mark not allowed origins with 'never' pin + # mark not configured origins with 'never' pin for pkg_file in self._cache.file_list: # type: ignore - if not is_allowed_origin(pkg_file, self.allowed_origins): - # Set the magic 'never' pin on not allowed origins - logging.debug("Marking not allowed %s with %s pin", pkg_file, + if (not origin_in_patterns(pkg_file, self.allowed_origins) + and not origin_in_patterns(pkg_file, + self.blacklisted_origins)): + # Set the magic 'never' pin on not configured origins + logging.debug("Marking not configured %s with %s pin", + pkg_file, NEVER_PIN) pins.append(PkgFilePin(pkg_file.id, NEVER_PIN)) # TODO(rbalint) pin not trusted origins with NEVER_PIN @@ -310,20 +335,28 @@ class UnattendedUpgradesCache(apt.Cache): pins.extend(self.pinning_from_regex_list( # type: ignore self.whitelist, 900)) if self.strict_whitelist: - policy = self._depcache.policy # pin down already pinned packages which are not on the whitelist # to not install locally pinned up packages accidentally - for pkg in self._cache.packages: - if pkg.has_versions: - pkg_ver = policy.get_candidate_ver(pkg) # type: ignore + for pkg in self: + if pkg.versions: + pkg_ver = pkg.candidate # type: ignore if pkg_ver is not None \ - and policy.get_priority(pkg_ver) > -1: + and pkg_ver.policy_priority > -1: # the pin is higher than set for allowed origins, thus # there is extra pinning configuration if not is_pkgname_in_whitelist(pkg.name, self.whitelist): pins.append(PkgPin(pkg.name, NEVER_PIN)) + # hold back packages from ignored origins + if self.blacklisted_origins: + for pkg in self: + if pkg.is_installed and \ + pkg_in_origins(pkg, + self.blacklisted_origins, + self.strict_blacklist): + pins.append(PkgPin(pkg.name, -1)) + return pins def apply_pinning(self, pins): @@ -357,7 +390,7 @@ class UnattendedUpgradesCache(apt.Cache): updates pocket with a higher version number """ try: - new_cand = ver_in_allowed_origin(pkg, self.allowed_origins) + new_cand = get_highest_ver(pkg, self.allowed_origins) # Only adjust to lower versions to avoid flipping back and forth # and to avoid picking a newer version, not selected by apt. # This helps avoiding upgrades to experimental's packages. @@ -369,7 +402,7 @@ class UnattendedUpgradesCache(apt.Cache): return True else: return False - except NoAllowedOriginError: + except NoMatchedOriginError: return False def call_checked(self, function, pkg, **kwargs): @@ -397,8 +430,12 @@ class UnattendedUpgradesCache(apt.Cache): """ new_pkgs_to_adjust = [] # List[str] - if not is_pkg_change_allowed(pkg, self.blacklist, self.whitelist, - self.strict_whitelist): + if not is_pkg_change_allowed(pkg, + self.blacklist, + self.whitelist, + self.strict_whitelist, + self.blacklisted_origins, + self.strict_blacklist): return if function == apt.package.Package.mark_upgrade \ @@ -432,18 +469,17 @@ class UnattendedUpgradesCache(apt.Cache): for marked_pkg in self.get_changes(): if marked_pkg.name in self._cached_candidate_pkgnames: continue - if not is_in_allowed_origin(marked_pkg.candidate, - self.allowed_origins): + if not ver_in_origins(marked_pkg.candidate, + self.allowed_origins): try: - ver_in_allowed_origin(marked_pkg, - self.allowed_origins) + get_highest_ver(marked_pkg, self.allowed_origins) # important! this avoids downgrades below if pkg.is_installed and not pkg.is_upgradable and \ apt_pkg.config.find_b("Unattended-Upgrade::Allow-" "downgrade", False): continue new_pkgs_to_adjust.append(marked_pkg) - except NoAllowedOriginError: + except NoMatchedOriginError: pass if new_pkgs_to_adjust: @@ -624,13 +660,14 @@ class KeptPkgs(defaultdict): Packages to keep by highest allowed pretty-printed origin """ - def add(self, pkg, # type: apt.Package - version, # type: apt.package.Version + def add(self, pkg, # type: Package + version, # type: Version cache # type: UnattendedUpgradesCache ): # type: (...) -> None for origin in version.origins: - if is_allowed_origin(origin, cache.allowed_origins): + if any([origin_in_patterns(origin, cache.allowed_origins), + origin_in_patterns(origin, cache.blacklisted_origins)]): self[origin.origin + " " + origin.archive].add(pkg.name) return @@ -811,38 +848,41 @@ def get_allowed_origins_legacy(): return allowed_origins -def get_allowed_origins(): - # type: () -> List[str] - """ return a list of allowed origins from apt.conf +def get_origins_from_conf(conf_key=ORIGIN_ALLOWED_KEY): + # type: (str) -> List[str] + """ return a list of origins from apt.conf This will take substitutions (like distro_id) into account. """ - allowed_origins = get_allowed_origins_legacy() - key = "Unattended-Upgrade::Origins-Pattern" + if conf_key == ORIGIN_ALLOWED_KEY: + origins = get_allowed_origins_legacy() + else: + origins = [] + try: - for s in apt_pkg.config.value_list(key): - allowed_origins.append(substitute(s)) + for s in apt_pkg.config.value_list(conf_key): + origins.append(substitute(s)) except ValueError: - logging.error(_("Unable to parse %s." % key)) + logging.error(_("Unable to parse %s." % conf_key)) raise - return allowed_origins + return origins -def match_whitelist_string(whitelist, origin): +def match_pattern_string(pattern, origin): # type: (str, Union[apt.package.Origin, apt_pkg.PackageFile]) -> bool """ - take a whitelist string in the form "origin=Debian,label=Debian-Security" - and match against the given python-apt origin. A empty whitelist string + take a pattern string in the form "origin=Debian,label=Debian-Security" + and match against the given python-apt origin. A empty pattern string never matches anything. """ - whitelist = whitelist.strip() - if whitelist == "": + pattern = pattern.strip() + if pattern == "": logging.warning("empty match string matches nothing") return False res = True # make "\," the html quote equivalent - whitelist = whitelist.replace("\\,", "%2C") - for token in whitelist.split(","): + pattern = pattern.replace("\\,", "%2C") + for token in pattern.split(","): # strip and unquote the "," back (what, value) = [s.strip().replace("%2C", ",") for s in token.split("=")] @@ -997,36 +1037,49 @@ def upgrade_in_minimal_steps(cache, # type: UnattendedUpgradesCache return res -def is_allowed_origin(origin, allowed_origins): - # type: (Union[apt.package.Origin, apt_pkg.PackageFile], List[str]) -> bool - - # local origin is allowed by default +def origin_in_patterns(origin, patterns, match_local=True): + # type: (Union[Origin, PackageFile], List[str], bool) -> bool + # local origin is matched by default if origin.component == 'now' and origin.archive == 'now' and \ not origin.label and not origin.site: - return True - for allowed in allowed_origins: - if match_whitelist_string(allowed, origin): + return True and match_local + for pattern in patterns: + if match_pattern_string(pattern, origin): return True return False -def is_in_allowed_origin(ver, allowed_origins): - # type: (apt.package.Version, List[str]) -> bool +def ver_in_origins(ver, patterns): + # type: (Version, List[str]) -> bool if not ver: return False for origin in ver.origins: - if is_allowed_origin(origin, allowed_origins): + if origin_in_patterns(origin, patterns): return True return False -def ver_in_allowed_origin(pkg, allowed_origins): - # type: (apt.Package, List[str]) -> apt.package.Version +def get_highest_ver(pkg, patterns): + # type: (apt.Package, List[str]) -> Version for ver in pkg.versions: - if is_in_allowed_origin(ver, allowed_origins): + if ver_in_origins(ver, patterns): # leave as soon as we have the highest new candidate return ver - raise NoAllowedOriginError() + raise NoMatchedOriginError() + + +def pkg_in_origins(pkg, patterns, strict_blacklist=True): + # type: (apt.Package, List[str], bool) -> bool + + for v in pkg.versions: + for o in v.origins: + if o.site: + if origin_in_patterns(o, patterns): + if pkg.installed and strict_blacklist and \ + pkg.installed > get_highest_ver(pkg, patterns): + return False + return True + return False def is_pkgname_in_blacklist(pkgname, blacklist): @@ -1051,11 +1104,19 @@ def is_pkgname_in_whitelist(pkgname, whitelist): return False -def is_pkg_change_allowed(pkg, blacklist, whitelist, strict_whitelist): - # type: (apt.Package, List[str], List[str], bool) -> bool +def is_pkg_change_allowed(pkg, + blacklist, + whitelist, + strict_whitelist, + blacklisted_origins, + strict_blacklist): + # type: (apt.Package, List[str], List[str], bool, List[str], bool) -> bool if is_pkgname_in_blacklist(pkg.name, blacklist): logging.debug("pkg %s package has been blacklisted" % pkg.name) return False + if pkg_in_origins(pkg, blacklisted_origins, strict_blacklist): + logging.debug("pkg %s package is blacklisted (by origin)" % pkg.name) + return False # a strict whitelist will not allow any changes not in the # whitelist, most people will want the relaxed whitelist # that whitelists a package but pulls in the package @@ -1146,12 +1207,16 @@ def sanity_problem(cache, desired_pkg): # origin so its good enough if we have a single trusted one if not any([o.trusted for o in pkg.candidate.origins]): return ("pkg %s is not from a trusted origin" % pkg.name) - if not is_in_allowed_origin(pkg.candidate, cache.allowed_origins): - return ("pkg %s is not in an allowed origin" % pkg.name) + if not ver_in_origins(pkg.candidate, cache.allowed_origins) \ + and not ver_in_origins(pkg.candidate, + cache.blacklisted_origins): + return ("pkg %s is not in an configured origin" % pkg.name) if not is_pkg_change_allowed(pkg, cache.blacklist, cache.whitelist, - cache.strict_whitelist): + cache.strict_whitelist, + cache.blacklisted_origins, + cache.strict_blacklist): return ("pkg %s is blacklisted or is not whitelisted" % pkg.name) # check if the package is unsafe to upgrade unattended @@ -1741,7 +1806,7 @@ def try_to_upgrade(pkg, # type: apt.Package if not pkg.is_upgradable and not apt_pkg.config.find_b( "Unattended-Upgrade::Allow-downgrade", False): return - except NoAllowedOriginError: + except NoMatchedOriginError: return cache._cached_candidate_pkgnames.add(pkg.name) cache.mark_upgrade_adjusted(pkg, from_user=not pkg.is_auto_installed) @@ -1750,7 +1815,7 @@ def try_to_upgrade(pkg, # type: apt.Package pkgs_to_upgrade.append(pkg) else: rewind_cache(cache, pkgs_to_upgrade) - except (SystemError, NoAllowedOriginError) as e: + except (SystemError, NoMatchedOriginError) as e: # can't upgrade logging.warning( _("package %s upgradable but fails to " @@ -1783,8 +1848,8 @@ def calculate_upgradable_pkgs(cache, # type: UnattendedUpgradesCache if (pkg.is_upgradable or candidate_version_changed(pkg) and is_pkgname_in_whitelist(pkg.name, cache.whitelist)): try: - ver_in_allowed_origin(pkg, cache.allowed_origins) - except NoAllowedOriginError: + get_highest_ver(pkg, cache.allowed_origins) + except NoMatchedOriginError: continue try_to_upgrade(pkg, pkgs_to_upgrade, @@ -1843,8 +1908,12 @@ def is_autoremove_valid(cache, # type: UnattendedUpgradesCache return True pkgnames = {pkg.name for pkg in changes} for pkg in changes: - if not is_pkg_change_allowed(pkg, cache.blacklist, cache.whitelist, - cache.strict_whitelist): + if not is_pkg_change_allowed(pkg, + cache.blacklist, + cache.whitelist, + cache.strict_whitelist, + cache.blacklisted_origins, + cache.strict_blacklist): logging.warning( _("Keeping the following auto-removable package(s) because " "they include %s which is set to be kept unmodified: %s"),