diff --git a/dockerfile_parse/parser.py b/dockerfile_parse/parser.py index 1fe07f0..9f740c1 100644 --- a/dockerfile_parse/parser.py +++ b/dockerfile_parse/parser.py @@ -16,7 +16,7 @@ from .constants import DOCKERFILE_FILENAME, COMMENT_INSTRUCTION from .util import (b2u, extract_key_values, get_key_val_dictionary, - u2b, Context, WordSplitter) + u2b, Context, WordSplitter, ImageName) logger = logging.getLogger(__name__) @@ -877,7 +877,9 @@ def image_from(from_value): )? """) match = re.match(regex, from_value) - return match.group('image', 'name') if match else (None, None) + image = ImageName.parse(match.group('image')) if match else None + name = match.group('name') if match else None + return image, name def _endline(line): diff --git a/dockerfile_parse/util.py b/dockerfile_parse/util.py index 1e2e298..c479924 100644 --- a/dockerfile_parse/util.py +++ b/dockerfile_parse/util.py @@ -48,7 +48,7 @@ def __init__(self, s, args=None, envs=None): :param envs: dict, environment variables to use; if None, do not attempt substitution """ - self.stream = StringIO(s) + self.stream = StringIO(str(s)) self.args = args self.envs = envs @@ -326,3 +326,121 @@ def get_values(self, context_type): if context_type.upper() == "LABEL": return self.labels raise ValueError("Unexpected context type: " + context_type) + + +class ImageName(object): + """Represent an image. + Naming Conventions + ================== + registry.somewhere/namespace/image_name:tag + |-----------------| registry, reg_uri + |---------| namespace + |--------------------------------------| repository + |--------------------| image name + |--| tag + |------------------------| image + |------------------------------------------| image + """ + + def __init__(self, registry=None, namespace=None, repo=None, tag=None): + self.registry = registry + self.namespace = namespace + self.repo = repo + self.tag = tag + + @classmethod + def parse(cls, image_name): + result = cls() + + if not image_name or str(image_name).isspace(): + return ImageName() + + if isinstance(image_name, cls): + return image_name + + # registry.org/namespace/repo:tag + s = image_name.split('/', 2) + + if len(s) == 2: + if '.' in s[0] or ':' in s[0]: + result.registry = s[0] if s[0] else None + else: + result.namespace = s[0] + elif len(s) == 3: + result.registry = s[0] if s[0] else None + result.namespace = s[1] + result.repo = s[-1] + + for sep in '@:': + try: + result.repo, result.tag = result.repo.rsplit(sep, 1) + except ValueError: + continue + break + + return result + + def to_str(self, registry=True, tag=True, explicit_tag=False, + explicit_namespace=False): + if self.repo is None: + raise RuntimeError('No image repository specified') + + result = self.get_repo(explicit_namespace) + + if tag and self.tag and ':' in self.tag: + result = '{0}@{1}'.format(result, self.tag) + elif tag and self.tag: + result = '{0}:{1}'.format(result, self.tag) + elif tag and explicit_tag: + result = '{0}:{1}'.format(result, 'latest') + + if registry and self.registry: + result = '{0}/{1}'.format(self.registry, result) + + return result + + def get_repo(self, explicit_namespace=False): + result = self.repo + if self.namespace: + result = '{0}/{1}'.format(self.namespace, result) + elif explicit_namespace: + result = '{0}/{1}'.format('library', result) + return result + + def enclose(self, organization): + if self.namespace == organization: + return + + repo_parts = [self.repo] + if self.namespace: + repo_parts.insert(0, self.namespace) + + self.namespace = organization + self.repo = '-'.join(repo_parts) + + def __str__(self): + return self.to_str(registry=True, tag=True) + + def __repr__(self): + return ( + "ImageName(registry={s.registry!r}, namespace={s.namespace!r}," + " repo={s.repo!r}, tag={s.tag!r})" + ).format(s=self) + + def __eq__(self, other): + if isinstance(other, str): + return self.__str__() == other + elif isinstance(other, ImageName): + return self.__dict__ == other.__dict__ + else: + return NotImplemented + + def __hash__(self): + return hash(self.to_str()) + + def copy(self): + return ImageName( + registry=self.registry, + namespace=self.namespace, + repo=self.repo, + tag=self.tag) diff --git a/tests/test_parser.py b/tests/test_parser.py index 5fc2888..dd07afc 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -19,7 +19,7 @@ from dockerfile_parse import DockerfileParser from dockerfile_parse.parser import image_from from dockerfile_parse.constants import COMMENT_INSTRUCTION -from dockerfile_parse.util import b2u, u2b, Context +from dockerfile_parse.util import b2u, u2b, Context, ImageName from tests.fixtures import dfparser, instruction NON_ASCII = "žluťoučký" @@ -28,6 +28,116 @@ instruction = instruction # pylint: disable=self-assigning-variable +@pytest.mark.parametrize(('image_string', 'dictionary'), [ + ( + " ", + {"namespace": None, "registry": None, "tag": None, "repo": None}, + ), ( + "registry.org/namespace/repo:tag", + {"namespace": "namespace", "registry": "registry.org", "tag": "tag", "repo": "repo"}, + ), ( + "/namespace/repo:tag", + {"namespace": "namespace", "registry": None, "tag": "tag", "repo": "repo"}, + ), ( + "registry.org/repo:tag", + {"namespace": None, "registry": "registry.org", "tag": "tag", "repo": "repo"}, + ), ( + "registry.org/repo", + {"namespace": None, "registry": "registry.org", "tag": None, "repo": "repo"}, + ), ( + "registry.org/repo@sha256:hash", + {"namespace": None, "registry": "registry.org", "tag": "sha256:hash", "repo": "repo"}, + ) +]) +class TestImageName(object): + def test_util_image_name_parse(self, image_string, dictionary): + image = ImageName.parse(image_string) + assert image.namespace == dictionary["namespace"] + assert image.registry == dictionary["registry"] + assert image.tag == dictionary["tag"] + assert image.repo == dictionary["repo"] + + def test_util_image_name_get_repo(self, image_string, dictionary): + image = ImageName.parse(image_string) + repo = "/".join(filter(None, (dictionary["namespace"], dictionary["repo"]))) + assert image.get_repo() == (repo if repo != "" else None) + assert image.get_repo(explicit_namespace=True) == "{0}/{1}".format( + dictionary["namespace"] if dictionary["namespace"] else "library", dictionary["repo"]) + + def test_util_image_name_to_str(self, image_string, dictionary): + image = ImageName.parse(image_string) + if dictionary["repo"] is None: + with pytest.raises(RuntimeError): + image.to_str() + else: + assert image.to_str() == image_string.lstrip('/') + assert image.to_str() == (image_string.lstrip('/') if image_string else None) + assert image.to_str(explicit_tag=True) == \ + image_string.lstrip('/') + (':latest' if dictionary["tag"] is None else "") + + def test_image_name_hash(self, image_string, dictionary): + image = ImageName.parse(image_string) + if dictionary["repo"] is None: + with pytest.raises(RuntimeError): + hash(image) + else: + hash(image) + + def test_image_name_repr(self, image_string, dictionary): + # so linter won't trip on unused argument + del dictionary + image = ImageName.parse(image_string) + repr(image) + + def test_image_name_comparison(self, image_string, dictionary): + # make sure that "==" is implemented correctly on both Python major releases + i1 = ImageName.parse(image_string) + i2 = ImageName(registry=dictionary["registry"], namespace=dictionary["namespace"], + repo=dictionary["repo"], + tag=dictionary["tag"]) + assert i1 == i2 + + i2 = ImageName(registry='foo.com', namespace='spam', repo='bar', tag='2') + # pylint: disable=unneeded-not + assert not i1 == i2 + + i1 = ImageName.parse(i2) + assert i1 == i2 + + i1 = i2.copy() + assert i1 == i2 + + +@pytest.mark.parametrize(('repo', 'organization', 'enclosed_repo'), ( + ('fedora', 'spam', 'spam/fedora'), + ('spam/fedora', 'spam', 'spam/fedora'), + ('spam/fedora', 'maps', 'maps/spam-fedora'), +)) +@pytest.mark.parametrize('registry', ( + 'example.registry.com', + 'example.registry.com:8888', + None, +)) +@pytest.mark.parametrize('tag', ('bacon', None)) +def test_image_name_enclose(repo, organization, enclosed_repo, registry, tag): + reference = repo + if tag: + reference = '{}:{}'.format(repo, tag) + if registry: + reference = '{}/{}'.format(registry, reference) + + image_name = ImageName.parse(reference) + assert image_name.get_repo() == repo + assert image_name.registry == registry + assert image_name.tag == tag + + image_name.enclose(organization) + assert image_name.get_repo() == enclosed_repo + # Verify that registry and tag are unaffected + assert image_name.registry == registry + assert image_name.tag == tag + + class TestDockerfileParser(object): def test_all_versions_match(self): def read_version(fp, regex):