diff --git a/bin/desi_tucson_transfer_catchup.sh b/bin/desi_tucson_transfer_catchup.sh index a9092bf..cab1499 100755 --- a/bin/desi_tucson_transfer_catchup.sh +++ b/bin/desi_tucson_transfer_catchup.sh @@ -22,7 +22,7 @@ src=rsync://${DESISYNC_HOSTNAME}/desi dst=${DESI_ROOT} log_root=${HOME}/Documents/Logfiles # -# Execute rsync commands. +# Execute rsync commands. Do not exceed 10 commands! # for d in engineering/focalplane engineering/focalplane/hwtables \ spectro/data \ @@ -30,12 +30,12 @@ for d in engineering/focalplane engineering/focalplane/hwtables \ spectro/nightwatch/kpno spectro/staging/lost+found; do case ${d} in engineering/focalplane) priority='nice'; exclude='--exclude archive --exclude hwtables --exclude *.ipynb --exclude .ipynb_checkpoints' ;; - engineering/focalplane/hwtables) priority='nice'; exclude='--include *.csv --exclude *' ;; + # engineering/focalplane/hwtables) priority='nice'; exclude='--include *.csv --exclude *' ;; spectro/data) priority=''; exclude='--exclude 2018* --exclude 2019* --exclude 2020* --exclude 2021* --exclude 2022* --exclude 2023*' ;; spectro/nightwatch/kpno) priority='nice'; exclude='--exclude 2021* --exclude 2022* --exclude 2023*' ;; spectro/redux/daily) priority=''; exclude='--exclude *.tmp --exclude attic --exclude exposures --exclude preproc --exclude temp --exclude tiles' ;; - spectro/redux/daily/exposures) priority=''; exclude='--exclude *.tmp' ;; - spectro/redux/daily/preproc) priority=''; exclude='--exclude *.tmp --exclude preproc-*.fits --exclude preproc-*.fits.gz' ;; + spectro/redux/daily/exposures) priority=''; exclude='--exclude 2019* --exclude 2020* --exclude 2021* --exclude 2022* --exclude 2023* --exclude *.tmp' ;; + spectro/redux/daily/preproc) priority=''; exclude='--exclude 2019* --exclude 2020* --exclude 2021* --exclude 2022* --exclude 2023* --exclude *.tmp --exclude preproc-*.fits --exclude preproc-*.fits.gz' ;; spectro/redux/daily/tiles) priority=''; exclude='--exclude *.tmp --exclude temp' ;; *) priority='nice'; exclude='' ;; esac diff --git a/doc/changes.rst b/doc/changes.rst index 24609ec..dd12491 100644 --- a/doc/changes.rst +++ b/doc/changes.rst @@ -5,7 +5,9 @@ Change Log 1.0.3 (unreleased) ------------------ -* No changes yet. +* Convert Tucson transfer to parallel operation (PR `#61`_). + +.. _`#61`: https://github.com/desihub/desitransfer/pull/61 1.0.2 (2024-06-21) ------------------ diff --git a/py/desitransfer/test/test_tucson.py b/py/desitransfer/test/test_tucson.py index 69180c0..841291b 100644 --- a/py/desitransfer/test/test_tucson.py +++ b/py/desitransfer/test/test_tucson.py @@ -7,8 +7,10 @@ import unittest import logging import subprocess as sub -from unittest.mock import patch, call, mock_open -from ..tucson import _options, _rsync, _configure_log, running +from tempfile import mkdtemp +from shutil import rmtree +from unittest.mock import patch, call, mock_open, MagicMock +from ..tucson import _options, _rsync, _configure_log, running, _get_proc from .. import __version__ as dtVersion @@ -18,11 +20,11 @@ class TestTucson(unittest.TestCase): @classmethod def setUpClass(cls): - pass + cls.temp_dir = mkdtemp() @classmethod def tearDownClass(cls): - pass + rmtree(cls.temp_dir) def setUp(self): pass @@ -164,3 +166,48 @@ def test_running_read_exit(self, mock_log, mock_popen, mock_exists, mock_remove) stdout=sub.PIPE, stderr=sub.PIPE), call().communicate()]) mock_popen().communicate.assert_called_once() + + @patch('subprocess.Popen') + @patch('desitransfer.tucson.log') + @patch('desitransfer.tucson.priority') + def test_get_proc(self, mock_priority, mock_log, mock_popen): + """Test the function for generating external procedures. + """ + home = os.environ['HOME'] + directories = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'] + exclude = set(['d', 'g']) + mock_priority.__contains__ = lambda self, x: x == 'e' or x == 'f' + # mock_priority.__iter__.return_value = ('e', 'f') + options = MagicMock() + options.test = False + options.log = self.temp_dir + proc, LOG_A, d = _get_proc(directories, exclude, '/src', '/dst', options) + self.assertEqual(d, 'a') + LOG_A.close() + options.test = True + proc, LOG_B, d = _get_proc(directories, exclude, '/src', '/dst', options) + self.assertEqual(d, 'b') + self.assertEqual(LOG_B, os.path.join(self.temp_dir, 'desi_tucson_transfer_b.log')) + proc, LOG_C, d = _get_proc(directories, exclude, '/src', '/dst', options) + self.assertEqual(d, 'c') + options.test = False + proc, LOG_E, d = _get_proc(directories, exclude, '/src', '/dst', options) + self.assertEqual(d, 'e') + proc, LOG_F, d = _get_proc(directories, exclude, '/src', '/dst', options) + self.assertEqual(d, 'f') + proc, LOG_H, d = _get_proc(directories, exclude, '/src', '/dst', options) + self.assertEqual(d, 'h') + proc, LOG_I, d = _get_proc(directories, exclude, '/src', '/dst', options) + self.assertEqual(d, 'i') + proc, LOG_J, d = _get_proc(directories, exclude, '/src', '/dst', options) + self.assertIsNone(proc) + mock_log.info.assert_has_calls([call(f'/usr/bin/rsync --archive --checksum --verbose --delete --delete-after --no-motd --password-file {home}/.desi /src/a/ /dst/a/'), + call("Directory '%s' will be transferred with os.nice(%d)", 'a', 5), + call(f'/usr/bin/rsync --archive --checksum --verbose --delete --delete-after --no-motd --password-file {home}/.desi /src/e/ /dst/e/'), + call(f'/usr/bin/rsync --archive --checksum --verbose --delete --delete-after --no-motd --password-file {home}/.desi /src/f/ /dst/f/'), + call(f'/usr/bin/rsync --archive --checksum --verbose --delete --delete-after --no-motd --password-file {home}/.desi /src/h/ /dst/h/'), + call("Directory '%s' will be transferred with os.nice(%d)", 'h', 5), + call(f'/usr/bin/rsync --archive --checksum --verbose --delete --delete-after --no-motd --password-file {home}/.desi /src/i/ /dst/i/'), + call("Directory '%s' will be transferred with os.nice(%d)", 'i', 5)]) + mock_log.warning.assert_has_calls([call('%s skipped at user request.', 'd'), + call('%s skipped at user request.', 'g')]) diff --git a/py/desitransfer/tucson.py b/py/desitransfer/tucson.py index 09edeb8..1f5e999 100644 --- a/py/desitransfer/tucson.py +++ b/py/desitransfer/tucson.py @@ -55,22 +55,23 @@ 'spectro/redux/daily/preproc', 'spectro/redux/daily/tiles', 'engineering/focalplane', + 'engineering/focalplane/hwtables', 'software/AnyConnect', 'software/CiscoSecureClient'] includes = {'engineering/focalplane': ["--exclude", "archive", "--exclude", "hwtables", "--exclude", ".ipynb_checkpoints", "--exclude", "*.ipynb"], - 'engineering/focalplane/hwtables': ["--include", "*.csv", "--exclude", "*"], + # 'engineering/focalplane/hwtables': ["--include", "*.csv", "--exclude", "*"], 'spectro/desi_spectro_calib': ["--exclude", ".svn"], 'spectro/data': exclude_years(2018), 'spectro/nightwatch/kpno': exclude_years(2021), 'spectro/redux/daily': ["--exclude", "*.tmp", "--exclude", "attic", "--exclude", "exposures", "--exclude", "preproc", "--exclude", "temp", "--exclude", "tiles"], - 'spectro/redux/daily/exposures': ["--exclude", "*.tmp"], - 'spectro/redux/daily/preproc': ["--exclude", "*.tmp", "--exclude", "preproc-*.fits", - "--exclude", "preproc-*.fits.gz"], + 'spectro/redux/daily/exposures': exclude_years(2019) + ["--exclude", "*.tmp"], + 'spectro/redux/daily/preproc': exclude_years(2019) + ["--exclude", "*.tmp", "--exclude", "preproc-*.fits", + "--exclude", "preproc-*.fits.gz"], 'spectro/redux/daily/tiles': ["--exclude", "*.tmp", "--exclude", "temp"], 'spectro/templates/basis_templates': ["--exclude", ".svn", "--exclude", "basis_templates_svn-old"], 'survey/ops/surveyops/trunk': ["--exclude", ".svn", "--exclude", "cronupdate.log"], @@ -78,6 +79,13 @@ "--include", "gaiadr2", "--include", "subpriority", "--exclude", "*"]} +priority = ('spectro/data', + 'spectro/redux/daily', + 'spectro/redux/daily/exposures', + 'spectro/redux/daily/preproc', + 'spectro/redux/daily/tiles') + + def _configure_log(debug): """Re-configure the default logger returned by ``desiutil.log``. @@ -130,6 +138,9 @@ def _options(): prsr.add_argument('-l', '--log', metavar='DIR', default=os.path.join(os.environ['HOME'], 'Documents', 'Logfiles'), help='Use DIR for log files (default %(default)s).') + prsr.add_argument('-p', '--processes', action='store', type=int, + dest='nproc', metavar="N", default=10, + help="Number of simultaneous downloads (default %(default)s).") prsr.add_argument('-s', '--static', action='store_true', dest='static', help='Also sync static data sets.') prsr.add_argument('-S', '--sleep', metavar='TIME', default='15m', dest='sleep', @@ -166,6 +177,61 @@ def _rsync(src, dst, d, checksum=False): return cmd +def _get_proc(directories, exclude, src, dst, options, nice=5): + """Prepare the next download directory for processing. + + Parameters + ---------- + directories : :class:`list` + A list of directories to process. + exclude : :class:`set` + Do not process directories in this set. + src : :class:`str` + Root source directory. + dst : :class:`str` + Root destination directory. + options : :class:`argparse.Namespace` + The parsed command-line options. + nice : :class:`int`, optional. + Lower-priority transfers will be run with this value passed to :func:`os.nice`, + default 5. + + Returns + ------- + :class:`tuple` + A tuple containing information about the process. + """ + global log + + def preexec_nice(): # pragma: no cover + os.nice(nice) + + def preexec_pass(): # pragma: no cover + pass + + try: + d = directories.pop(0) + while d in exclude: + log.warning("%s skipped at user request.", d) + d = directories.pop(0) + log_file = os.path.join(options.log, + 'desi_tucson_transfer_' + d.replace('/', '_') + '.log') + command = _rsync(src, dst, d, checksum=options.checksum) + if options.test: + return (command, log_file, d) + else: + log.info(' '.join(command)) + LOG = open(log_file, 'ab') + if d in priority: + preexec_fn = preexec_pass + else: + log.info("Directory '%s' will be transferred with os.nice(%d)", d, nice) + preexec_fn = preexec_nice + return (sub.Popen(command, preexec_fn=preexec_fn, stdout=LOG, stderr=sub.STDOUT), LOG, d) + except IndexError: + return (None, None, None) + + def running(pid_file): """Test for a duplicate process already running. @@ -216,8 +282,15 @@ def main(): try: foo = os.environ[e] except KeyError: - log.error("%s must be set!", e) + log.critical("%s must be set!", e) return 1 + + # + # Check other options. + # + if options.nproc > 10: + log.critical("Number of simultaneous transfers %d > 10!", options.nproc) + return 1 # # Source and destination. # @@ -226,7 +299,7 @@ def main(): if 'DESI_ROOT' in os.environ: dst = os.environ['DESI_ROOT'] else: - log.error("DESI_ROOT must be set, or destination directory set on the command-line (-d DIR)!") + log.critical("DESI_ROOT must be set, or destination directory set on the command-line (-d DIR)!") return 1 else: dst = options.destination @@ -248,7 +321,7 @@ def main(): try: sleepy_time = int(options.sleep[0:-1]) * suffix[s] except ValueError: - log.error("Invalid value for sleep interval: '%s'!", options.sleep) + log.critical("Invalid value for sleep interval: '%s'!", options.sleep) return 1 log.debug("requests.get('%s')", os.environ['DESISYNC_STATUS_URL']) if not options.test: @@ -266,18 +339,28 @@ def main(): directories = static + dynamic else: directories = dynamic - for d in directories: - if d in exclude: - log.warning("%s skipped at user request.", d) - else: - command = _rsync(src, dst, d, checksum=options.checksum) - log.info(' '.join(command)) - if not options.test: - log_file = os.path.join(options.log, - 'desi_tucson_transfer_' + d.replace('/', '_') + '.log') - with open(log_file, 'ab') as LOG: - proc = sub.Popen(command, stdout=LOG, stderr=sub.STDOUT) - status = proc.wait() + proc_pool = dict() + for p in range(options.nproc): + proc_key = 'proc{0:03d}'.format(p) + proc_pool[proc_key] = _get_proc(directories, exclude, src, dst, options) + while any([v[0] is not None for v in proc_pool.values()]): + for proc_key in proc_pool: + proc, LOG, d = proc_pool[proc_key] + if proc is None: + status = None + else: + if options.test: + log.debug("%s: %s -> %s", d, ' '.join(proc), LOG) + status = 0 + else: + status = proc.poll() + if status is not None: + if not options.test: + LOG.close() if status != 0: log.critical("rsync error detected for %s/%s/! Check logs!", dst, d) + proc_pool[proc_key] = _get_proc(directories, exclude, src, dst, options) + if not options.test: + log.debug("Waiting for jobs to complete, sleeping %s.", options.sleep) + time.sleep(sleepy_time) return 0