Skip to content

Commit

Permalink
Remove temporary bash scripts in compression details
Browse files Browse the repository at this point in the history
This replaces the temporary bash scripts used for getting the version
of the compression tools with the corresponding helper functions.

The get_7z_version helper has also been updated to be compatible
with newer versions of the 7z command.
  • Loading branch information
replaceafill authored Feb 19, 2025
1 parent 395df96 commit 771631d
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 67 deletions.
37 changes: 15 additions & 22 deletions storage_service/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,48 +430,41 @@ def get_compressed_package_checksum(pointer_path):
return (checksum, checksum_algorithm)


def get_tool_info_command(compression):
"""Return command for outputting compression tool details
def get_tool_info(compression):
"""Return compression tool details
:param compression: one of the constants in ``COMPRESSION_ALGORITHMS``.
:returns: command in string format
:returns: tool details in string format
"""
if compression in (COMPRESSION_TAR, COMPRESSION_TAR_BZIP2, COMPRESSION_TAR_GZIP):
program = "tar"
algo = {COMPRESSION_TAR_BZIP2: "-j", COMPRESSION_TAR_GZIP: "-z"}.get(
compression, ""
)

tool_info_command = (
'echo program="tar"\\; '
f'algorithm="{algo}"\\; '
'version="`tar --version | grep tar`"'
)
version = get_tar_version()
elif compression in (COMPRESSION_7Z_BZIP, COMPRESSION_7Z_LZMA, COMPRESSION_7Z_COPY):
program = "7z"
algo = {
COMPRESSION_7Z_BZIP: COMPRESS_ALGO_BZIP2,
COMPRESSION_7Z_LZMA: COMPRESS_ALGO_LZMA,
COMPRESSION_7Z_COPY: COMPRESS_ALGO_7Z_COPY,
}.get(compression, "")
tool_info_command = (
"#!/bin/bash\n"
'echo program="7z"\\; '
f'algorithm="{algo}"\\; '
'version="`7z | grep Version`"'
)
version = get_7z_version()
else:
raise NotImplementedError(
_("Algorithm %(algorithm)s not implemented") % {"algorithm": compression}
)

return tool_info_command
return f"program={program}; algorithm={algo}; version={version}"


def get_7z_version():
return [
line
for line in subprocess.check_output("7z").splitlines()
if b"Version" in line
][0].decode("utf8")
lines = subprocess.check_output("7z").decode().splitlines()
if lines[2].startswith("p7zip Version"):
# 7-Zip 16.02: return only version line.
return lines[2]
else:
# 7-Zip 23.01: merge and return copyright and architecture lines.
return "".join(lines[1:3])


def get_tar_version():
Expand Down
25 changes: 9 additions & 16 deletions storage_service/locations/models/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -1700,32 +1700,25 @@ def compress_package(self, algorithm, extract_path=None, detailed_output=False):

LOGGER.info("Compressing package with: %s to %s", command, compressed_filename)
if detailed_output:
tool_info_command = utils.get_tool_info_command(algorithm)
p = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = p.communicate()
rc = p.returncode
LOGGER.debug("Compress package RC: %s", rc)

with tempfile.NamedTemporaryFile(
encoding="utf-8", mode="wt", delete=False
) as tmpfile:
os.chmod(tmpfile.name, 0o770)
tmpfile.write(tool_info_command)
tmpfile.close()
tic_cmd = [tmpfile.name]
p = subprocess.Popen(
tic_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
tic_stdout, tic_stderr = p.communicate()
os.unlink(tmpfile.name)
tic_stdout = ""
tic_stderr = ""
try:
tic_stdout = utils.get_tool_info(algorithm)
except subprocess.CalledProcessError as e:
tic_stderr = e.stderr.decode()
except Exception as e:
tic_stderr = str(e)
LOGGER.debug("Tool info stdout")
LOGGER.debug(tool_info_command)
LOGGER.debug(tic_stdout)
LOGGER.debug(tic_stderr)
details = {
"event_detail": tic_stdout.decode("utf-8"),
"event_detail": tic_stdout,
"event_outcome_detail_note": 'Standard Output="{}"; Standard Error="{}"'.format(
stdout.decode("utf-8"), stderr.decode("utf-8")
),
Expand Down
34 changes: 30 additions & 4 deletions tests/common/test_command_import_aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from unittest import mock

import pytest
from common import utils
from django.core.management import call_command
from locations import models
from lxml import etree

TEST_DIR = pathlib.Path(__file__).resolve().parent
FIXTURES_DIR = TEST_DIR / "fixtures"
Expand Down Expand Up @@ -62,19 +64,30 @@ def test_import_aip_command_creates_uncompressed_package(


@pytest.mark.parametrize(
"compression_algorithm",
"compression_algorithm,expected_event_detail_algorithm",
[
("7z with bzip"),
("7z without compression"),
("7z with bzip", utils.COMPRESS_ALGO_BZIP2),
("7z without compression", utils.COMPRESS_ALGO_7Z_COPY),
],
)
@pytest.mark.django_db
@mock.patch("os.chown")
@mock.patch("common.management.commands.import_aip.getpwnam")
@mock.patch("logging.config")
@mock.patch("common.utils.get_7z_version")
def test_import_aip_command_creates_compressed_package(
logging_config, getpwnam, chown, capsys, aip_storage_location, compression_algorithm
get_7z_version,
logging_config,
getpwnam,
chown,
capsys,
aip_storage_location,
compression_algorithm,
expected_event_detail_algorithm,
):
expected_event_detail_version = "p7zip Version 3.0"
get_7z_version.return_value = expected_event_detail_version

call_command(
"import_aip",
"--decompress-source",
Expand All @@ -95,6 +108,19 @@ def test_import_aip_command_creates_compressed_package(

assert package.is_compressed

# Verify the pointer file contains the compression event.
assert package.full_pointer_file_path is not None
root = etree.parse(package.full_pointer_file_path)
event_details = root.xpath(
".//premis3:eventType[text()='compression']/../premis3:eventDetailInformation/premis3:eventDetail",
namespaces=utils.NSMAP,
)
assert len(event_details) == 1
assert (
event_details[0].text.strip()
== f"program=7z; algorithm={expected_event_detail_algorithm}; version={expected_event_detail_version}"
)


@pytest.mark.django_db
@mock.patch("os.chown")
Expand Down
99 changes: 74 additions & 25 deletions tests/common/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pathlib
import re
import shutil
import tarfile
from collections import namedtuple
Expand All @@ -13,9 +14,9 @@
FIXTURES_DIR = TEST_DIR / "fixtures"

# Until further work is done to bring compression into its own module we can
# use these constants for this test, but we can do better.
PROG_VERS_7Z = "7z"
PROG_VERS_TAR = "tar"
# use these regular expression patterns for this test, but we can do better.
PROG_VERS_7Z = r"(p7zip|7-Zip)"
PROG_VERS_TAR = r"tar"

# Specifically string types for the tuple we create.
COMPRESS_ORDER_ONE = "1"
Expand Down Expand Up @@ -87,58 +88,106 @@ def test_get_compress_command(compression, command):


@pytest.mark.parametrize(
"compression,command",
"compression,expected_program,expected_algorithm",
[
(
utils.COMPRESSION_7Z_BZIP,
'#!/bin/bash\necho program="7z"\\; algorithm="bzip2"\\; version="`7z | grep Version`"',
),
(utils.COMPRESSION_7Z_BZIP, "7z", utils.COMPRESS_ALGO_BZIP2),
(
utils.COMPRESSION_7Z_LZMA,
'#!/bin/bash\necho program="7z"\\; algorithm="lzma"\\; version="`7z | grep Version`"',
"7z",
utils.COMPRESS_ALGO_LZMA,
),
(
utils.COMPRESSION_7Z_COPY,
'#!/bin/bash\necho program="7z"\\; algorithm="copy"\\; version="`7z | grep Version`"',
"7z",
utils.COMPRESS_ALGO_7Z_COPY,
),
(
utils.COMPRESSION_TAR,
'echo program="tar"\\; algorithm=""\\; version="`tar --version | grep tar`"',
"tar",
"",
),
(
utils.COMPRESSION_TAR_GZIP,
'echo program="tar"\\; algorithm="-z"\\; version="`tar --version | grep tar`"',
"tar",
"-z",
),
(
utils.COMPRESSION_TAR_BZIP2,
'echo program="tar"\\; algorithm="-j"\\; version="`tar --version | grep tar`"',
"tar",
"-j",
),
],
)
def test_get_tool_info_command(compression, command):
cmd = utils.get_tool_info_command(compression)
assert cmd == command, (
f"Incorrect tool info: {cmd} returned for compression input {compression}"
@mock.patch("subprocess.check_output")
def test_get_tool_info(check_output, compression, expected_program, expected_algorithm):
if expected_program == "7z":
expected_version = "p7zip Version 16.02"
command_output = b"\n".join(
[
b"",
b"7-Zip [64] 16.02 : Copyright (c) 1999-2016 Igor Pavlov : 2016-05-21",
expected_version.encode(),
]
)
elif expected_program == "tar":
expected_version = "tar (GNU tar) 1.35"
command_output = b"\n".join(
[
expected_version.encode(),
b"Copyright (C) 2023 Free Software Foundation, Inc.",
]
)
else:
raise AssertionError(f"unexpected program {expected_program}")
check_output.return_value = command_output
expected_output = f"program={expected_program}; algorithm={expected_algorithm}; version={expected_version}"

output = utils.get_tool_info(compression)

assert output == expected_output, (
f"Incorrect tool info: {output} returned for compression input {compression}"
)


def test_get_tool_info_fails_if_compression_algorithm_is_not_implemented():
with pytest.raises(
NotImplementedError, match="Algorithm unknown and random not implemented"
):
utils.get_tool_info("unknown and random")


@pytest.mark.parametrize(
"compression,cmd_output,expected_detail",
[
(
utils.COMPRESSION_7Z_BZIP,
"7z command\nVersion 3.0\nsomething else",
'program="7z"; version="Version 3.0"',
"\n7z command\np7zip Version 3.0\nsomething else",
'program="7z"; version="p7zip Version 3.0"',
),
(
utils.COMPRESSION_7Z_BZIP,
"\n7-Zip 23.01 (x64)\n 64-bit locale=C.UTF-8\nsomething else",
'program="7z"; version="7-Zip 23.01 (x64) 64-bit locale=C.UTF-8"',
),
(
utils.COMPRESSION_7Z_LZMA,
"7z command\nVersion 3.0\nsomething else",
'program="7z"; version="Version 3.0"',
"\n7z command\np7zip Version 3.0\nsomething else",
'program="7z"; version="p7zip Version 3.0"',
),
(
utils.COMPRESSION_7Z_LZMA,
"\n7-Zip 23.01 (x64)\n 64-bit locale=C.UTF-8\nsomething else",
'program="7z"; version="7-Zip 23.01 (x64) 64-bit locale=C.UTF-8"',
),
(
utils.COMPRESSION_7Z_COPY,
"\n7z command\np7zip Version 3.0\nsomething else",
'program="7z"; version="p7zip Version 3.0"',
),
(
utils.COMPRESSION_7Z_COPY,
"7z command\nVersion 3.0\nsomething else",
'program="7z"; version="Version 3.0"',
"\n7-Zip 23.01 (x64)\n 64-bit locale=C.UTF-8\nsomething else",
'program="7z"; version="7-Zip 23.01 (x64) 64-bit locale=C.UTF-8"',
),
(
utils.COMPRESSION_TAR,
Expand Down Expand Up @@ -171,7 +220,7 @@ def test_get_compression_event_detail(


@pytest.mark.parametrize(
"compression, version,extension,program_name,transform",
"compression,version,extension,program_name,transform",
[
(
utils.COMPRESSION_7Z_BZIP,
Expand Down Expand Up @@ -256,7 +305,7 @@ def test_get_format_info(compression, version, extension, program_name, transfor
"""
fsentry = FSEntry()
vers, ext, prog_name = utils.set_compression_transforms(fsentry, compression, 1)
assert version in vers
assert re.search(version, vers) is not None
assert ext == extension
assert program_name in prog_name
assert fsentry.transform_files == transform
Expand Down

0 comments on commit 771631d

Please sign in to comment.