Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make sure to clean-up depth-first #20

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion archive_upload/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.1.0"
__version__ = "1.1.1"
146 changes: 116 additions & 30 deletions archive_upload/handlers/dsmc_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import re
import socket
import stat
import subprocess
import shutil
import tarfile
Expand All @@ -31,6 +32,13 @@ class BaseDsmcHandler(BaseRestHandler):
Base handler for dsmc upload operations.
"""

WRAPPER_TEMPLATE = """
#!/usr/bin/env bash

set -e

{}
"""
def initialize(self, config, runner_service):
"""
Ensures that any parameters feed to this are available
Expand Down Expand Up @@ -117,6 +125,22 @@ def write_error(self, status_code, **kwargs):
"msg": self._reason}
self.finish(response_data)

@staticmethod
def write_command_to_wrapper(cmd, wrapper):

with open(wrapper, "w") as fh:
fh.write(
BaseDsmcHandler.WRAPPER_TEMPLATE.format(cmd)
)

os.chmod(
wrapper,
os.stat(wrapper)[
stat.ST_MODE
] | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
)


class VersionHandler(BaseDsmcHandler):

"""
Expand Down Expand Up @@ -585,8 +609,24 @@ def post(self, runfolder_archive):
path_to_archive, filename, filename)
log.info("Generating checksums for {}".format(path_to_archive))
log.debug("Will now execute command {}".format(cmd))

wrapper = os.path.abspath(
os.path.join(
path_to_archive_root,
"{}.wrapper.checksum.sh".format(
runfolder_archive
)
)
)
self.write_command_to_wrapper(cmd, wrapper)

job_id = self.runner_service.start(
cmd, nbr_of_cores=1, run_dir=log_dir, stdout=checksum_log, stderr=checksum_log)
wrapper,
nbr_of_cores=1,
run_dir=log_dir,
stdout=checksum_log,
stderr=checksum_log
)

status_end_point = "{0}://{1}{2}".format(
self.request.protocol,
Expand Down Expand Up @@ -782,12 +822,19 @@ def _process_comma_separated_param(param_name):
cmd = self._create_archive_cmd(
path_to_runfolder, path_to_archive, exclude_dirs, exclude_extensions)
log.info("run command: {}".format(cmd))

log_dir = os.path.abspath(self.config["log_directory"])
archive_log = os.path.abspath(os.path.join(log_dir, "create_archive.log"))

wrapper = os.path.abspath(
os.path.join(
path_to_archive_root,
"{}.wrapper.create.sh".format(runfolder)
)
)
self.write_command_to_wrapper(cmd, wrapper)

job_id = self.runner_service.start(
cmd,
wrapper,
nbr_of_cores=1,
run_dir=log_dir,
stdout=archive_log,
Expand Down Expand Up @@ -835,39 +882,67 @@ def _create_tarball_cmd(tarball_name, path_to_archive, exclude_from_tarball):
"--file={} " \
"{} " \
".".format(
path_to_archive,
tarball_name,
tarball_name,
exclude_patterns
)
path_to_archive,
tarball_name,
tarball_name,
exclude_patterns
)

@staticmethod
def _remove_tarballed_files_cmd(path_to_archive, tarball_name):
return "cd {} && " \
"tar " \
"--list " \
"--file={} |" \
def _remove_tarballed_files_cmd(tarball_list_file):
# list all non-directory paths, starting at the deepest level, filter them against the
# tarball contents and remove the paths that have been added to the tarball
# grep -x ensures that the entire line is matched against the patterns in the file given
# by -f (which will be the list of files and folders in the tar archive)
# -n1 ensures that xargs will process each entry separately and -I% assigns the entry to
# the variable "%" which can be referenced in the rm expression
return "find . " \
"-depth " \
"-not -type d |" \
"grep " \
"-x " \
"-f {} |" \
"xargs " \
"-n1 " \
"rm -f".format(
path_to_archive,
tarball_name
)
"-I% " \
"rm -f \"%\"".format(
tarball_list_file
)
b97pla marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def _remove_empty_dirs_cmd(path_to_archive):
return "cd {} && " \
"find " \
". " \
"-depth " \
def _remove_empty_dirs_cmd(tarball_list_file):
# list all directory paths, starting at the deepest level, filter them against the
# tarball contents and remove the paths that have been added to the tarball
# grep -x ensures that the entire line is matched against the patterns in the file given
# by -f (which will be the list of files and folders in the tar archive)
# -n1 ensures that xargs will process each entry separately and -I% assigns the entry to
# the variable "%" which can be referenced in the rm expression
return "find . " \
"-mindepth 1 " \
"-depth " \
"-type d |" \
"grep " \
"-x " \
"-f {} |" \
"xargs " \
"-n1 " \
"-I% " \
"rmdir " \
"--ignore-fail-on-non-empty".format(
path_to_archive
)
"--ignore-fail-on-non-empty " \
"\"%\"".format(
tarball_list_file
)

@staticmethod
def _list_tarfile_contents(tarball_name, tarball_list_file):
return "tar " \
"--list " \
"--file={} |" \
"sed -re 's#/$##' > " \
"{}".format(
tarball_name,
tarball_list_file
)

def post(self, archive):
"""
Expand All @@ -889,6 +964,7 @@ def post(self, archive):

tarball_name = "{}.tar.gz".format(archive)
tarball_path = os.path.join(path_to_archive, tarball_name)
tarball_list_file = "{}.list".format(path_to_archive)

log.debug("Checking to see if {} exists".format(tarball_path))

Expand All @@ -897,16 +973,18 @@ def post(self, archive):
raise ArchiveException(reason=msg, status_code=400)

exclude_from_tarball = self.config["exclude_from_tarball"]
cmd = " ( {} ) && ( {} ) ; ( {} )".format(
cmd = "{}\n{}\n{}\n{}".format(
self._create_tarball_cmd(
tarball_name,
path_to_archive,
exclude_from_tarball),
self._list_tarfile_contents(
tarball_name,
tarball_list_file),
self._remove_tarballed_files_cmd(
path_to_archive,
tarball_name),
tarball_list_file),
self._remove_empty_dirs_cmd(
path_to_archive)
tarball_list_file)
)

log.info("run command: {}".format(cmd))
Expand All @@ -915,11 +993,19 @@ def post(self, archive):
tarball_path,
path_to_archive_root))

wrapper = os.path.abspath(
os.path.join(
path_to_archive_root,
"{}.wrapper.compress.sh".format(archive)
)
)
self.write_command_to_wrapper(cmd, wrapper)

log_dir = os.path.abspath(self.config["log_directory"])
tarball_log = os.path.abspath(os.path.join(log_dir, "compress_archive.log"))

job_id = self.runner_service.start(
cmd,
wrapper,
nbr_of_cores=1,
run_dir=log_dir,
stdout=tarball_log,
Expand Down
33 changes: 29 additions & 4 deletions tests/test_dsmc_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,43 @@ def test_generate_checksum(self, mock_start, mock_status):
mock_start.return_value = job_id
mock_status.return_value = State.DONE

path_to_archive = os.path.abspath(os.path.join(self.dummy_config["path_to_archive_root"], "test_archive"))
archive_name = "test_archive"
path_to_archive = os.path.abspath(
os.path.join(
self.dummy_config["path_to_archive_root"],
archive_name
)
)
log_dir = os.path.abspath(self.dummy_config["log_directory"])
checksum_log = os.path.join(log_dir, "checksum.log")
filename = "checksums_prior_to_pdc.md5"

json_resp = self.poll_status(self.API_BASE + "/gen_checksums/test_archive")
json_resp = self.poll_status(
self.API_BASE + "/gen_checksums/{}".format(
archive_name
)
)

self.assertEqual(json_resp["state"], State.DONE)
self.assertEqual(int(json_resp["job_id"]), job_id)

expected_cmd = "cd {} && /usr/bin/find -L . -type f ! -path './{}' -exec /usr/bin/md5sum {{}} + > {}".format(path_to_archive, filename, filename)
mock_start.assert_called_with(self.runner_service, expected_cmd, nbr_of_cores=1, run_dir=log_dir, stdout=checksum_log, stderr=checksum_log)
wrapper = os.path.abspath(
os.path.join(
os.path.dirname(path_to_archive),
"{}.wrapper.checksum.sh".format(
archive_name
)
)
)
expected_cmd = wrapper
mock_start.assert_called_with(
self.runner_service,
expected_cmd,
nbr_of_cores=1,
run_dir=log_dir,
stdout=checksum_log,
stderr=checksum_log
)

def test_reupload_handler(self):
job_id = 27
Expand Down