Skip to content

Commit

Permalink
add imageName class
Browse files Browse the repository at this point in the history
Signed-off-by: Tim van Katwijk <[email protected]>
  • Loading branch information
tim-vk committed Dec 26, 2022
1 parent b623023 commit 7d4dfe4
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 4 deletions.
6 changes: 4 additions & 2 deletions dockerfile_parse/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from .constants import DOCKERFILE_FILENAME, COMMENT_INSTRUCTION
from .util import (b2u, extract_key_values, get_key_val_dictionary,
u2b, Context, WordSplitter)
u2b, Context, WordSplitter, ImageName)


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -877,7 +877,9 @@ def image_from(from_value):
)?
""")
match = re.match(regex, from_value)
return match.group('image', 'name') if match else (None, None)
image = ImageName.parse(match.group('image')) if match else None
name = match.group('name') if match else None
return image, name


def _endline(line):
Expand Down
120 changes: 119 additions & 1 deletion dockerfile_parse/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(self, s, args=None, envs=None):
:param envs: dict, environment variables to use; if None, do not
attempt substitution
"""
self.stream = StringIO(s)
self.stream = StringIO(str(s))
self.args = args
self.envs = envs

Expand Down Expand Up @@ -326,3 +326,121 @@ def get_values(self, context_type):
if context_type.upper() == "LABEL":
return self.labels
raise ValueError("Unexpected context type: " + context_type)


class ImageName(object):
"""Represent an image.
Naming Conventions
==================
registry.somewhere/namespace/image_name:tag
|-----------------| registry, reg_uri
|---------| namespace
|--------------------------------------| repository
|--------------------| image name
|--| tag
|------------------------| image
|------------------------------------------| image
"""

def __init__(self, registry=None, namespace=None, repo=None, tag=None):
self.registry = registry
self.namespace = namespace
self.repo = repo
self.tag = tag

@classmethod
def parse(cls, image_name):
result = cls()

if not image_name or str(image_name).isspace():
return ImageName()

if isinstance(image_name, cls):
return image_name

# registry.org/namespace/repo:tag
s = image_name.split('/', 2)

if len(s) == 2:
if '.' in s[0] or ':' in s[0]:
result.registry = s[0] if s[0] else None
else:
result.namespace = s[0]
elif len(s) == 3:
result.registry = s[0] if s[0] else None
result.namespace = s[1]
result.repo = s[-1]

for sep in '@:':
try:
result.repo, result.tag = result.repo.rsplit(sep, 1)
except ValueError:
continue
break

return result

def to_str(self, registry=True, tag=True, explicit_tag=False,
explicit_namespace=False):
if self.repo is None:
raise RuntimeError('No image repository specified')

result = self.get_repo(explicit_namespace)

if tag and self.tag and ':' in self.tag:
result = '{0}@{1}'.format(result, self.tag)
elif tag and self.tag:
result = '{0}:{1}'.format(result, self.tag)
elif tag and explicit_tag:
result = '{0}:{1}'.format(result, 'latest')

if registry and self.registry:
result = '{0}/{1}'.format(self.registry, result)

return result

def get_repo(self, explicit_namespace=False):
result = self.repo
if self.namespace:
result = '{0}/{1}'.format(self.namespace, result)
elif explicit_namespace:
result = '{0}/{1}'.format('library', result)
return result

def enclose(self, organization):
if self.namespace == organization:
return

repo_parts = [self.repo]
if self.namespace:
repo_parts.insert(0, self.namespace)

self.namespace = organization
self.repo = '-'.join(repo_parts)

def __str__(self):
return self.to_str(registry=True, tag=True)

def __repr__(self):
return (
"ImageName(registry={s.registry!r}, namespace={s.namespace!r},"
" repo={s.repo!r}, tag={s.tag!r})"
).format(s=self)

def __eq__(self, other):
if isinstance(other, str):
return self.__str__() == other
elif isinstance(other, ImageName):
return self.__dict__ == other.__dict__
else:
return NotImplemented

def __hash__(self):
return hash(self.to_str())

def copy(self):
return ImageName(
registry=self.registry,
namespace=self.namespace,
repo=self.repo,
tag=self.tag)
112 changes: 111 additions & 1 deletion tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from dockerfile_parse import DockerfileParser
from dockerfile_parse.parser import image_from
from dockerfile_parse.constants import COMMENT_INSTRUCTION
from dockerfile_parse.util import b2u, u2b, Context
from dockerfile_parse.util import b2u, u2b, Context, ImageName
from tests.fixtures import dfparser, instruction

NON_ASCII = "žluťoučký"
Expand All @@ -28,6 +28,116 @@
instruction = instruction # pylint: disable=self-assigning-variable


@pytest.mark.parametrize(('image_string', 'dictionary'), [
(
" ",
{"namespace": None, "registry": None, "tag": None, "repo": None},
), (
"registry.org/namespace/repo:tag",
{"namespace": "namespace", "registry": "registry.org", "tag": "tag", "repo": "repo"},
), (
"/namespace/repo:tag",
{"namespace": "namespace", "registry": None, "tag": "tag", "repo": "repo"},
), (
"registry.org/repo:tag",
{"namespace": None, "registry": "registry.org", "tag": "tag", "repo": "repo"},
), (
"registry.org/repo",
{"namespace": None, "registry": "registry.org", "tag": None, "repo": "repo"},
), (
"registry.org/repo@sha256:hash",
{"namespace": None, "registry": "registry.org", "tag": "sha256:hash", "repo": "repo"},
)
])
class TestImageName(object):
def test_util_image_name_parse(self, image_string, dictionary):
image = ImageName.parse(image_string)
assert image.namespace == dictionary["namespace"]
assert image.registry == dictionary["registry"]
assert image.tag == dictionary["tag"]
assert image.repo == dictionary["repo"]

def test_util_image_name_get_repo(self, image_string, dictionary):
image = ImageName.parse(image_string)
repo = "/".join(filter(None, (dictionary["namespace"], dictionary["repo"])))
assert image.get_repo() == (repo if repo != "" else None)
assert image.get_repo(explicit_namespace=True) == "{0}/{1}".format(
dictionary["namespace"] if dictionary["namespace"] else "library", dictionary["repo"])

def test_util_image_name_to_str(self, image_string, dictionary):
image = ImageName.parse(image_string)
if dictionary["repo"] is None:
with pytest.raises(RuntimeError):
image.to_str()
else:
assert image.to_str() == image_string.lstrip('/')
assert image.to_str() == (image_string.lstrip('/') if image_string else None)
assert image.to_str(explicit_tag=True) == \
image_string.lstrip('/') + (':latest' if dictionary["tag"] is None else "")

def test_image_name_hash(self, image_string, dictionary):
image = ImageName.parse(image_string)
if dictionary["repo"] is None:
with pytest.raises(RuntimeError):
hash(image)
else:
hash(image)

def test_image_name_repr(self, image_string, dictionary):
# so linter won't trip on unused argument
del dictionary
image = ImageName.parse(image_string)
repr(image)

def test_image_name_comparison(self, image_string, dictionary):
# make sure that "==" is implemented correctly on both Python major releases
i1 = ImageName.parse(image_string)
i2 = ImageName(registry=dictionary["registry"], namespace=dictionary["namespace"],
repo=dictionary["repo"],
tag=dictionary["tag"])
assert i1 == i2

i2 = ImageName(registry='foo.com', namespace='spam', repo='bar', tag='2')
# pylint: disable=unneeded-not
assert not i1 == i2

i1 = ImageName.parse(i2)
assert i1 == i2

i1 = i2.copy()
assert i1 == i2


@pytest.mark.parametrize(('repo', 'organization', 'enclosed_repo'), (
('fedora', 'spam', 'spam/fedora'),
('spam/fedora', 'spam', 'spam/fedora'),
('spam/fedora', 'maps', 'maps/spam-fedora'),
))
@pytest.mark.parametrize('registry', (
'example.registry.com',
'example.registry.com:8888',
None,
))
@pytest.mark.parametrize('tag', ('bacon', None))
def test_image_name_enclose(repo, organization, enclosed_repo, registry, tag):
reference = repo
if tag:
reference = '{}:{}'.format(repo, tag)
if registry:
reference = '{}/{}'.format(registry, reference)

image_name = ImageName.parse(reference)
assert image_name.get_repo() == repo
assert image_name.registry == registry
assert image_name.tag == tag

image_name.enclose(organization)
assert image_name.get_repo() == enclosed_repo
# Verify that registry and tag are unaffected
assert image_name.registry == registry
assert image_name.tag == tag


class TestDockerfileParser(object):
def test_all_versions_match(self):
def read_version(fp, regex):
Expand Down

0 comments on commit 7d4dfe4

Please sign in to comment.