Skip to content

Commit

Permalink
make sure to clean-up depth-first
Browse files Browse the repository at this point in the history
wrap commands in script
  • Loading branch information
b97pla committed Nov 3, 2023
1 parent e06c6a6 commit e943ad5
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 35 deletions.
2 changes: 1 addition & 1 deletion archive_upload/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.1.0"
__version__ = "1.1.1"
136 changes: 106 additions & 30 deletions archive_upload/handlers/dsmc_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import re
import socket
import stat
import subprocess
import shutil
import tarfile
Expand All @@ -31,6 +32,13 @@ class BaseDsmcHandler(BaseRestHandler):
Base handler for dsmc upload operations.
"""

WRAPPER_TEMPLATE = """
#!/usr/bin/env bash
set -e
{}
"""
def initialize(self, config, runner_service):
"""
Ensures that any parameters feed to this are available
Expand Down Expand Up @@ -117,6 +125,22 @@ def write_error(self, status_code, **kwargs):
"msg": self._reason}
self.finish(response_data)

@staticmethod
def write_command_to_wrapper(cmd, wrapper):

with open(wrapper, "w") as fh:
fh.write(
BaseDsmcHandler.WRAPPER_TEMPLATE.format(cmd)
)

os.chmod(
wrapper,
os.stat(wrapper)[
stat.ST_MODE
] | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
)


class VersionHandler(BaseDsmcHandler):

"""
Expand Down Expand Up @@ -585,8 +609,24 @@ def post(self, runfolder_archive):
path_to_archive, filename, filename)
log.info("Generating checksums for {}".format(path_to_archive))
log.debug("Will now execute command {}".format(cmd))

wrapper = os.path.abspath(
os.path.join(
path_to_archive_root,
"{}.wrapper.checksum.sh".format(
runfolder_archive
)
)
)
self.write_command_to_wrapper(cmd, wrapper)

job_id = self.runner_service.start(
cmd, nbr_of_cores=1, run_dir=log_dir, stdout=checksum_log, stderr=checksum_log)
wrapper,
nbr_of_cores=1,
run_dir=log_dir,
stdout=checksum_log,
stderr=checksum_log
)

status_end_point = "{0}://{1}{2}".format(
self.request.protocol,
Expand Down Expand Up @@ -782,12 +822,19 @@ def _process_comma_separated_param(param_name):
cmd = self._create_archive_cmd(
path_to_runfolder, path_to_archive, exclude_dirs, exclude_extensions)
log.info("run command: {}".format(cmd))

log_dir = os.path.abspath(self.config["log_directory"])
archive_log = os.path.abspath(os.path.join(log_dir, "create_archive.log"))

wrapper = os.path.abspath(
os.path.join(
path_to_archive_root,
"{}.wrapper.create.sh".format(runfolder)
)
)
self.write_command_to_wrapper(cmd, wrapper)

job_id = self.runner_service.start(
cmd,
wrapper,
nbr_of_cores=1,
run_dir=log_dir,
stdout=archive_log,
Expand Down Expand Up @@ -835,39 +882,57 @@ def _create_tarball_cmd(tarball_name, path_to_archive, exclude_from_tarball):
"--file={} " \
"{} " \
".".format(
path_to_archive,
tarball_name,
tarball_name,
exclude_patterns
)
path_to_archive,
tarball_name,
tarball_name,
exclude_patterns
)

@staticmethod
def _remove_tarballed_files_cmd(path_to_archive, tarball_name):
return "cd {} && " \
"tar " \
"--list " \
"--file={} |" \
def _remove_tarballed_files_cmd(tarball_list_file):
# list all non-directory paths, filter them against the tarball contents and
# remove the paths that have been added to the tarball
return "find . " \
"-depth " \
"-not -type d |" \
"grep " \
"-x " \
"-f {} |" \
"xargs " \
"-n1 " \
"rm -f".format(
path_to_archive,
tarball_name
)
"-I% " \
"rm -f \"%\"".format(
tarball_list_file
)

@staticmethod
def _remove_empty_dirs_cmd(path_to_archive):
return "cd {} && " \
"find " \
". " \
"-depth " \
def _remove_empty_dirs_cmd(tarball_list_file):
return "find . " \
"-mindepth 1 " \
"-depth " \
"-type d |" \
"grep " \
"-x " \
"-f {} |" \
"xargs " \
"-n1 " \
"-I% " \
"rmdir " \
"--ignore-fail-on-non-empty".format(
path_to_archive
)
"--ignore-fail-on-non-empty " \
"\"%\"".format(
tarball_list_file
)

@staticmethod
def _list_tarfile_contents(tarball_name, tarball_list_file):
return "tar " \
"--list " \
"--file={} |" \
"sed -re 's#/$##' > " \
"{}".format(
tarball_name,
tarball_list_file
)

def post(self, archive):
"""
Expand All @@ -889,6 +954,7 @@ def post(self, archive):

tarball_name = "{}.tar.gz".format(archive)
tarball_path = os.path.join(path_to_archive, tarball_name)
tarball_list_file = "{}.list".format(path_to_archive)

log.debug("Checking to see if {} exists".format(tarball_path))

Expand All @@ -897,16 +963,18 @@ def post(self, archive):
raise ArchiveException(reason=msg, status_code=400)

exclude_from_tarball = self.config["exclude_from_tarball"]
cmd = " ( {} ) && ( {} ) ; ( {} )".format(
cmd = "{}\n{}\n{}\n{}".format(
self._create_tarball_cmd(
tarball_name,
path_to_archive,
exclude_from_tarball),
self._list_tarfile_contents(
tarball_name,
tarball_list_file),
self._remove_tarballed_files_cmd(
path_to_archive,
tarball_name),
tarball_list_file),
self._remove_empty_dirs_cmd(
path_to_archive)
tarball_list_file)
)

log.info("run command: {}".format(cmd))
Expand All @@ -915,11 +983,19 @@ def post(self, archive):
tarball_path,
path_to_archive_root))

wrapper = os.path.abspath(
os.path.join(
path_to_archive_root,
"{}.wrapper.compress.sh".format(archive)
)
)
self.write_command_to_wrapper(cmd, wrapper)

log_dir = os.path.abspath(self.config["log_directory"])
tarball_log = os.path.abspath(os.path.join(log_dir, "compress_archive.log"))

job_id = self.runner_service.start(
cmd,
wrapper,
nbr_of_cores=1,
run_dir=log_dir,
stdout=tarball_log,
Expand Down
33 changes: 29 additions & 4 deletions tests/test_dsmc_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,43 @@ def test_generate_checksum(self, mock_start, mock_status):
mock_start.return_value = job_id
mock_status.return_value = State.DONE

path_to_archive = os.path.abspath(os.path.join(self.dummy_config["path_to_archive_root"], "test_archive"))
archive_name = "test_archive"
path_to_archive = os.path.abspath(
os.path.join(
self.dummy_config["path_to_archive_root"],
archive_name
)
)
log_dir = os.path.abspath(self.dummy_config["log_directory"])
checksum_log = os.path.join(log_dir, "checksum.log")
filename = "checksums_prior_to_pdc.md5"

json_resp = self.poll_status(self.API_BASE + "/gen_checksums/test_archive")
json_resp = self.poll_status(
self.API_BASE + "/gen_checksums/{}".format(
archive_name
)
)

self.assertEqual(json_resp["state"], State.DONE)
self.assertEqual(int(json_resp["job_id"]), job_id)

expected_cmd = "cd {} && /usr/bin/find -L . -type f ! -path './{}' -exec /usr/bin/md5sum {{}} + > {}".format(path_to_archive, filename, filename)
mock_start.assert_called_with(self.runner_service, expected_cmd, nbr_of_cores=1, run_dir=log_dir, stdout=checksum_log, stderr=checksum_log)
wrapper = os.path.abspath(
os.path.join(
os.path.dirname(path_to_archive),
"{}.wrapper.checksum.sh".format(
archive_name
)
)
)
expected_cmd = wrapper
mock_start.assert_called_with(
self.runner_service,
expected_cmd,
nbr_of_cores=1,
run_dir=log_dir,
stdout=checksum_log,
stderr=checksum_log
)

def test_reupload_handler(self):
job_id = 27
Expand Down

0 comments on commit e943ad5

Please sign in to comment.